You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/03 19:12:58 UTC

[08/11] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d8a56df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d8a56df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d8a56df

Branch: refs/heads/trunk
Commit: 7d8a56df3df26be7537f2a5158469629c34b911c
Parents: 71c8912 92b3622
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Oct 3 12:12:11 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 3 12:12:11 2013 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/service/StorageProxy.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d8a56df/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 6c8e636,9b559e5..d16bef9
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -963,29 -637,35 +963,29 @@@ public class StorageProxy implements St
          Iterator<InetAddress> iter = targets.iterator();
          InetAddress target = iter.next();
  
 -        // direct writes to local DC or old Cassandra versions
 -        if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
 +        // Add the other destinations of the same message as a FORWARD_HEADER entry
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try
          {
 -            // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
 -            // creating a second iterator since we already have a perfectly good one
 -            MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
 +            out.writeInt(targets.size() - 1);
              while (iter.hasNext())
              {
 -                target = iter.next();
 -                MessagingService.instance().sendRR(message, target, handler);
 +                InetAddress destination = iter.next();
 +                CompactEndpointSerializationHelper.serialize(destination, out);
-                 int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
++                int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel);
 +                out.writeInt(id);
 +                logger.trace("Adding FWD message to {}@{}", id, destination);
              }
 -            return;
 +            message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
 +            // send the combined message + forward headers
 +            int id = MessagingService.instance().sendRR(message, target, handler);
 +            logger.trace("Sending message to {}@{}", id, target);
          }
 -
 -        // Add all the other destinations of the same message as a FORWARD_HEADER entry
 -        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
 -        DataOutputStream dos = new DataOutputStream(bos);
 -        dos.writeInt(targets.size() - 1);
 -        while (iter.hasNext())
 +        catch (IOException e)
          {
 -            InetAddress destination = iter.next();
 -            CompactEndpointSerializationHelper.serialize(destination, dos);
 -            String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
 -            dos.writeUTF(id);
 +            // DataOutputBuffer is in-memory, doesn't throw IOException
 +            throw new AssertionError(e);
          }
 -        message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
 -        // send the combined message + forward headers
 -        Tracing.trace("Enqueuing message to {}", target);
 -        MessagingService.instance().sendRR(message, target, handler);
      }
  
      private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)