You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/03 19:12:58 UTC
[08/11] git commit: merge from 1.2
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d8a56df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d8a56df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d8a56df
Branch: refs/heads/trunk
Commit: 7d8a56df3df26be7537f2a5158469629c34b911c
Parents: 71c8912 92b3622
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Oct 3 12:12:11 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 3 12:12:11 2013 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/service/StorageProxy.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d8a56df/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 6c8e636,9b559e5..d16bef9
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -963,29 -637,35 +963,29 @@@ public class StorageProxy implements St
Iterator<InetAddress> iter = targets.iterator();
InetAddress target = iter.next();
- // direct writes to local DC or old Cassandra versions
- if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
+ // Add the other destinations of the same message as a FORWARD_HEADER entry
+ DataOutputBuffer out = new DataOutputBuffer();
+ try
{
- // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
- // creating a second iterator since we already have a perfectly good one
- MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+ out.writeInt(targets.size() - 1);
while (iter.hasNext())
{
- target = iter.next();
- MessagingService.instance().sendRR(message, target, handler);
+ InetAddress destination = iter.next();
+ CompactEndpointSerializationHelper.serialize(destination, out);
- int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
++ int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel);
+ out.writeInt(id);
+ logger.trace("Adding FWD message to {}@{}", id, destination);
}
- return;
+ message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
+ // send the combined message + forward headers
+ int id = MessagingService.instance().sendRR(message, target, handler);
+ logger.trace("Sending message to {}@{}", id, target);
}
-
- // Add all the other destinations of the same message as a FORWARD_HEADER entry
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeInt(targets.size() - 1);
- while (iter.hasNext())
+ catch (IOException e)
{
- InetAddress destination = iter.next();
- CompactEndpointSerializationHelper.serialize(destination, dos);
- String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
- dos.writeUTF(id);
+ // DataOutputBuffer is in-memory, doesn't throw IOException
+ throw new AssertionError(e);
}
- message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
- // send the combined message + forward headers
- Tracing.trace("Enqueuing message to {}", target);
- MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)