You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/04 22:34:12 UTC
svn commit: r1055187 -
/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Tue Jan 4 21:34:11 2011
New Revision: 1055187
URL: http://svn.apache.org/viewvc?rev=1055187&view=rev
Log:
fix batch mutations post-#1530
patch by tjake; reviewed by jbellis for CASSANDRA-1931
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055187&r1=1055186&r2=1055187&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Jan 4 21:34:11 2011
@@ -123,7 +123,7 @@ public class StorageProxy implements Sto
responseHandlers.add(responseHandler);
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
- Multimap<String, Pair<Message, InetAddress>> dcMessages = HashMultimap.create(hintedEndpoints.size(), 10);
+ Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
Message unhintedMessage = null;
for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
@@ -150,7 +150,16 @@ public class StorageProxy implements Sto
}
if (logger.isDebugEnabled())
logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
- dcMessages.put(dc, new Pair<Message, InetAddress>(unhintedMessage, destination));
+
+
+ Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+ if (messages == null)
+ {
+ messages = HashMultimap.create();
+ dcMessages.put(dc, messages);
+ }
+
+ messages.put(unhintedMessage, destination);
}
}
else
@@ -167,7 +176,16 @@ public class StorageProxy implements Sto
}
}
responseHandler.addHintCallback(hintedMessage, destination);
- dcMessages.put(dc, new Pair<Message, InetAddress>(hintedMessage, destination));
+
+ Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+
+ if (messages == null)
+ {
+ messages = HashMultimap.create();
+ dcMessages.put(dc, messages);
+ }
+
+ messages.put(hintedMessage, destination);
}
}
@@ -194,53 +212,55 @@ public class StorageProxy implements Sto
/**
* for each datacenter, send a message to one node to relay the write to other replicas
*/
- private static void sendMessages(String localDataCenter, Multimap<String, Pair<Message, InetAddress>> dcMessages)
+ private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages)
throws IOException
{
- for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> entry : dcMessages.asMap().entrySet())
+ for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet())
{
String dataCenter = entry.getKey();
// Grab a set of all the messages bound for this dataCenter and create an iterator over this set.
- Collection<Pair<Message, InetAddress>> messagesForDataCenter = entry.getValue();
- Iterator<Pair<Message, InetAddress>> iter = messagesForDataCenter.iterator();
- assert iter.hasNext();
-
- // First endpoint in list is the destination for this group
- Pair<Message, InetAddress> messageAndDestination = iter.next();
-
- Message primaryMessage = messageAndDestination.left;
- InetAddress target = messageAndDestination.right;
+ Map<Message, Collection<InetAddress>> messagesForDataCenter = entry.getValue().asMap();
- // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
- while (iter.hasNext())
+ for (Map.Entry<Message, Collection<InetAddress>> messages: messagesForDataCenter.entrySet())
{
- messageAndDestination = iter.next();
- assert messageAndDestination.left == primaryMessage;
+ Message message = messages.getKey();
+ Iterator<InetAddress> iter = messages.getValue().iterator();
+ assert iter.hasNext();
+
+ // First endpoint in list is the destination for this group
+ InetAddress target = iter.next();
+
- if (dataCenter.equals(localDataCenter))
+ // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
+ while (iter.hasNext())
{
- // direct write to local DC
- assert primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
- MessagingService.instance().sendOneWay(primaryMessage, target);
- }
- else
- {
- // group all nodes in this DC as forward headers on the primary message
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // append to older addresses
- byte[] previousHints = primaryMessage.getHeader(RowMutation.FORWARD_HEADER);
- if (previousHints != null)
- dos.write(previousHints);
+ InetAddress destination = iter.next();
+
+ if (dataCenter.equals(localDataCenter))
+ {
+ // direct write to local DC
+ assert message.getHeader(RowMutation.FORWARD_HEADER) == null;
+ MessagingService.instance().sendOneWay(message, target);
+ }
+ else
+ {
+ // group all nodes in this DC as forward headers on the primary message
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // append to older addresses
+ byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
+ if (previousHints != null)
+ dos.write(previousHints);
- dos.write(messageAndDestination.right.getAddress());
- primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+ dos.write(destination.getAddress());
+ message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+ }
}
+
+ MessagingService.instance().sendOneWay(message, target);
}
-
- MessagingService.instance().sendOneWay(primaryMessage, target);
}
}