You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/22 18:26:39 UTC
git commit: cleanup and simplify; only need to create one MessageOut
Updated Branches:
refs/heads/trunk 369936c2c -> 62f429337
cleanup and simplify; only need to create one MessageOut
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62f42933
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62f42933
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62f42933
Branch: refs/heads/trunk
Commit: 62f429337caf0aa83b68720a5904e8527b840c80
Parents: 369936c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed May 22 11:26:17 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed May 22 11:26:17 2013 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/net/MessageOut.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 40 ++++++--------
2 files changed, 18 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62f42933/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 14ef377..f419d51 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -83,7 +83,7 @@ public class MessageOut<T>
return new MessageOut<T>(verb, payload, serializer, builder.build());
}
- public MessageOut withHeaderRemoved(String key)
+ public MessageOut<T> withHeaderRemoved(String key)
{
ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
for (Map.Entry<String, byte[]> entry : parameters.entrySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62f42933/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0e49854..829fc0a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -711,8 +711,8 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws OverloadedException
{
- // Multimap that holds onto all the messages and addresses meant for a specific datacenter
- Map<String, Multimap<MessageOut, InetAddress>> dcMessages = null;
+ // replicas grouped by datacenter
+ Map<String, Collection<InetAddress>> dcGroups = null;
for (InetAddress destination : targets)
{
@@ -736,20 +736,17 @@ public class StorageProxy implements StorageProxyMBean
else
{
// belongs on a different server
- if (logger.isTraceEnabled())
- logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- Multimap<MessageOut, InetAddress> messages = (dcMessages != null) ? dcMessages.get(dc) : null;
+ Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
if (messages == null)
{
- messages = HashMultimap.create();
- if (dcMessages == null)
- dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
- dcMessages.put(dc, messages);
+ messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, messages);
}
- messages.put(rm.createMessage(), destination);
+ messages.add(destination);
}
}
else
@@ -762,21 +759,18 @@ public class StorageProxy implements StorageProxyMBean
}
}
- if (dcMessages != null)
+ if (dcGroups != null)
{
- // for each datacenter, send a message to one node to relay the write to other replicas
- for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
+ MessageOut<RowMutation> message = rm.createMessage();
+ // for each datacenter, send the message to one node to relay the write to other replicas
+ for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
{
boolean isLocalDC = entry.getKey().equals(localDataCenter);
- for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
- {
- MessageOut message = messages.getKey();
- Collection<InetAddress> targets1 = messages.getValue();
- // a single message object is used for unhinted writes, so clean out any forwards
- // from previous loop iterations
- message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
- sendMessagesToOneDC(message, targets1, isLocalDC, responseHandler);
- }
+ Collection<InetAddress> dcTargets = entry.getValue();
+ // a single message object is used for unhinted writes, so clean out any forwards
+ // from previous loop iterations
+ message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
+ sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
}
}
}