You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/22 18:47:55 UTC

[1/2] git commit: Ensure that batchlog and hint timeouts do not produce hints

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 1c2a812a9 -> 3a73e392f


Ensure that batchlog and hint timeouts do not produce hints

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-7058


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2890cc5b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2890cc5b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2890cc5b

Branch: refs/heads/cassandra-2.0
Commit: 2890cc5be986740cadf491bb5efbb49af2b11c57
Parents: 0547d16
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Apr 22 19:10:51 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Apr 22 19:10:51 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 src/java/org/apache/cassandra/db/BatchlogManager.java   |  2 +-
 .../org/apache/cassandra/db/HintedHandOffManager.java   |  8 +++-----
 src/java/org/apache/cassandra/net/MessagingService.java | 12 +++++++++++-
 4 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dc48131..74ddcfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
  * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
  * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
+ * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
 
 
 1.2.16

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index ea32e9d..02af9d3 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -328,7 +328,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 }
             };
             WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
-            MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
+            MessagingService.instance().sendUnhintableMutation(mutation, ep, handler);
             handlers.add(handler);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 427bbf2..a7a3e06 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -55,7 +55,6 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
-import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.*;
@@ -391,8 +390,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     continue;
                 }
 
-                MessageOut<RowMutation> message = rm.createMessage();
-                rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
+                rateLimiter.acquire((int) RowMutation.serializer.serializedSize(rm, MessagingService.current_version));
                 Runnable callback = new Runnable()
                 {
                     public void run()
@@ -401,8 +399,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                         deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                     }
                 };
-                WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
-                MessagingService.instance().sendRR(message, endpoint, responseHandler);
+                WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
+                MessagingService.instance().sendUnhintableMutation(rm, endpoint, responseHandler);
                 responseHandlers.add(responseHandler);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 09fa272..3f90d7f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -559,6 +559,17 @@ public final class MessagingService implements MessagingServiceMBean
     }
 
     /**
+     * A special version of sendRR that doesn't trigger a hint for the mutation on a timeout.
+     * Used by BatchlogManager and HintedHandOffManager.
+     */
+    public void sendUnhintableMutation(RowMutation mutation, InetAddress to, IMessageCallback cb)
+    {
+        String id = nextId();
+        callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), DatabaseDescriptor.getWriteRpcTimeout());
+        sendOneWay(mutation.createMessage(), id, to);
+    }
+
+    /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
      * Also holds the message (only mutation messages) to determine if it
@@ -568,7 +579,6 @@ public final class MessagingService implements MessagingServiceMBean
      * @param to      endpoint to which the message needs to be sent
      * @param cb      callback interface which is used to pass the responses or
      *                suggest that a timeout occurred to the invoker of the send().
-     *                suggest that a timeout occurred to the invoker of the send().
      * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */


[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/net/MessagingService.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a73e392
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a73e392
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a73e392

Branch: refs/heads/cassandra-2.0
Commit: 3a73e392fa424bff5378d4bb72117cfa28f9b0b7
Parents: 1c2a812 2890cc5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Apr 22 19:47:42 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Apr 22 19:47:42 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    |  2 +-
 .../cassandra/db/HintedHandOffManager.java      |  4 +--
 .../apache/cassandra/net/MessagingService.java  | 27 ++++++++++++++------
 .../apache/cassandra/net/WriteCallbackInfo.java | 16 +++++++++---
 .../apache/cassandra/service/StorageProxy.java  |  7 ++++-
 6 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9b73c89,74ddcfd..dbed949
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -8,61 -9,10 +8,62 @@@ Merged from 1.2
   * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
   * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
   * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
+  * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
  
  
 -1.2.16
 +2.0.7
 + * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
 + * Avoid early loading of non-system keyspaces before compaction-leftovers 
 +   cleanup at startup (CASSANDRA-6913)
 + * Restrict Windows to parallel repairs (CASSANDRA-6907)
 + * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
 + * Fix NPE in MeteredFlusher (CASSANDRA-6820)
 + * Fix race processing range scan responses (CASSANDRA-6820)
 + * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821)
 + * Add uuid() function (CASSANDRA-6473)
 + * Omit tombstones from schema digests (CASSANDRA-6862)
 + * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884)
 + * Lower chances for losing new SSTables during nodetool refresh and
 +   ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514)
 + * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708)
 + * Update hadoop_cql3_word_count example (CASSANDRA-6793)
 + * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
 + * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865)
 + * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
 + * Fix schema concurrency exceptions (CASSANDRA-6841)
 + * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
 + * Fix saving triggers to schema (CASSANDRA-6789)
 + * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
 + * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838)
 + * Fix static counter columns (CASSANDRA-6827)
 + * Restore expiring->deleted (cell) compaction optimization (CASSANDRA-6844)
 + * Fix CompactionManager.needsCleanup (CASSANDRA-6845)
 + * Correctly compare BooleanType values other than 0 and 1 (CASSANDRA-6779)
 + * Read message id as string from earlier versions (CASSANDRA-6840)
 + * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837)
 + * Add paranoid disk failure option (CASSANDRA-6646)
 + * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
 + * Extend triggers to support CAS updates (CASSANDRA-6882)
 + * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
 + * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
 + * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
 + * Improve MeteredFlusher handling of MF-unaffected column families
 +   (CASSANDRA-6867)
 + * Add CqlRecordReader using native pagination (CASSANDRA-6311)
 + * Add QueryHandler interface (CASSANDRA-6659)
 + * Track liveRatio per-memtable, not per-CF (CASSANDRA-6945)
 + * Make sure upgradesstables keeps sstable level (CASSANDRA-6958)
 + * Fix LIMIT with static columns (CASSANDRA-6956)
 + * Fix clash with CQL column name in thrift validation (CASSANDRA-6892)
 + * Fix error with super columns in mixed 1.2-2.0 clusters (CASSANDRA-6966)
 + * Fix bad skip of sstables on slice query with composite start/finish (CASSANDRA-6825)
 + * Fix unintended update with conditional statement (CASSANDRA-6893)
 + * Fix map element access in IF (CASSANDRA-6914)
 + * Avoid costly range calculations for range queries on system keyspaces
 +   (CASSANDRA-6906)
 + * Fix SSTable not released if stream session fails (CASSANDRA-6818)
 + * Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
 +Merged from 1.2:
   * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
   * add extra SSL cipher suites (CASSANDRA-6613)
   * fix nodetool getsstables for blob PK (CASSANDRA-6803)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 5770994,02af9d3..5aea736
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -323,7 -328,7 +323,7 @@@ public class BatchlogManager implement
                  }
              };
              WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
-             MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
 -            MessagingService.instance().sendUnhintableMutation(mutation, ep, handler);
++            MessagingService.instance().sendRR(mutation.createMessage(), ep, handler, false);
              handlers.add(handler);
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 942707e,a7a3e06..13d1bb0
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -450,8 -399,8 +450,8 @@@ public class HintedHandOffManager imple
                          deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                      }
                  };
-                 WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
-                 MessagingService.instance().sendRR(message, endpoint, responseHandler);
+                 WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
 -                MessagingService.instance().sendUnhintableMutation(rm, endpoint, responseHandler);
++                MessagingService.instance().sendRR(message, endpoint, responseHandler, false);
                  responseHandlers.add(responseHandler);
              }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index cc5dae5,3f90d7f..cccf698
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -537,21 -527,18 +537,33 @@@ public final class MessagingService imp
          return verbHandlers.get(type);
      }
  
 -    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
 +    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
      {
 -        String messageId = nextId();
 -        CallbackInfo previous;
 -
 -        // If HH is enabled and this is a mutation message => store the message to track for potential hints.
 -        if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
 -            previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
 -        else
 -            previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
 +        assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
 +        int messageId = nextId();
 +        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
 +        assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
 +        return messageId;
 +    }
  
-     public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation> message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
 -        assert previous == null;
++    public int addCallback(IAsyncCallback cb,
++                           MessageOut<? extends IMutation> message,
++                           InetAddress to,
++                           long timeout,
++                           ConsistencyLevel consistencyLevel,
++                           boolean allowHints)
 +    {
 +        assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
 +        int messageId = nextId();
-         CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
++        CallbackInfo previous = callbacks.put(messageId,
++                                              new WriteCallbackInfo(to,
++                                                                    cb,
++                                                                    message,
++                                                                    callbackDeserializers.get(message.verb),
++                                                                    consistencyLevel,
++                                                                    allowHints),
++                                                                    timeout);
 +        assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
          return messageId;
      }
  
@@@ -568,24 -559,14 +580,21 @@@
      }
  
      /**
 -     * A special version of sendRR that doesn't trigger a hint for the mutation on a timeout.
 -     * Used by BatchlogManager and HintedHandOffManager.
 +     * Send a non-mutation message to a given endpoint. This method specifies a callback
 +     * which is invoked with the actual response.
-      * Also holds the message (only mutation messages) to determine if it
-      * needs to trigger a hint (uses StorageProxy for that).
 +     *
 +     * @param message message to be sent.
 +     * @param to      endpoint to which the message needs to be sent
 +     * @param cb      callback interface which is used to pass the responses or
 +     *                suggest that a timeout occurred to the invoker of the send().
-      *                suggest that a timeout occurred to the invoker of the send().
 +     * @param timeout the timeout used for expiration
 +     * @return an reference to message id used to match with the result
       */
 -    public void sendUnhintableMutation(RowMutation mutation, InetAddress to, IMessageCallback cb)
 +    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
      {
 -        String id = nextId();
 -        callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), DatabaseDescriptor.getWriteRpcTimeout());
 -        sendOneWay(mutation.createMessage(), id, to);
 +        int id = addCallback(cb, message, to, timeout);
 +        sendOneWay(message, id, to);
 +        return id;
      }
  
      /**
@@@ -596,14 -577,24 +605,16 @@@
       *
       * @param message message to be sent.
       * @param to      endpoint to which the message needs to be sent
 -     * @param cb      callback interface which is used to pass the responses or
 +     * @param handler callback interface which is used to pass the responses or
       *                suggest that a timeout occurred to the invoker of the send().
-      *                suggest that a timeout occurred to the invoker of the send().
 -     * @param timeout the timeout used for expiration
       * @return an reference to message id used to match with the result
       */
-     public int sendRR(MessageOut<? extends IMutation> message, InetAddress to, AbstractWriteResponseHandler handler)
 -    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
++    public int sendRR(MessageOut<? extends IMutation> message,
++                      InetAddress to,
++                      AbstractWriteResponseHandler handler,
++                      boolean allowHints)
      {
-         int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel);
 -        String id = addCallback(cb, message, to, timeout);
 -
 -        if (cb instanceof AbstractWriteResponseHandler)
 -        {
 -            PBSPredictor.instance().startWriteOperation(id);
 -        }
 -        else if (cb instanceof ReadCallback)
 -        {
 -            PBSPredictor.instance().startReadOperation(id);
 -        }
 -
++        int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
          sendOneWay(message, id, to);
          return id;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index be7b668,0000000..987ec15
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,46 -1,0 +1,54 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.net.InetAddress;
 +
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.service.StorageProxy;
 +
 +public class WriteCallbackInfo extends CallbackInfo
 +{
 +    public final MessageOut sentMessage;
 +    private final ConsistencyLevel consistencyLevel;
++    private final boolean allowHints;
 +
-     public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
++    public WriteCallbackInfo(InetAddress target,
++                             IAsyncCallback callback,
++                             MessageOut message,
++                             IVersionedSerializer<?> serializer,
++                             ConsistencyLevel consistencyLevel,
++                             boolean allowHints)
 +    {
 +        super(target, callback, serializer);
 +        assert message != null;
 +        this.sentMessage = message;
 +        this.consistencyLevel = consistencyLevel;
++        this.allowHints = allowHints;
 +    }
 +
 +    public boolean shouldHint()
 +    {
-         return sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
-                && consistencyLevel != ConsistencyLevel.ANY
-                && StorageProxy.shouldHint(target);
++        return allowHints
++            && sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
++            && consistencyLevel != ConsistencyLevel.ANY
++            && StorageProxy.shouldHint(target);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 033ce8e,7ef3d72..fc6ee3a
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -917,29 -616,35 +917,34 @@@ public class StorageProxy implements St
          Iterator<InetAddress> iter = targets.iterator();
          InetAddress target = iter.next();
  
 -        // direct writes to local DC or old Cassandra versions
 -        if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_12)
 +        // Add the other destinations of the same message as a FORWARD_HEADER entry
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try
          {
 -            // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
 -            // creating a second iterator since we already have a perfectly good one
 -            MessagingService.instance().sendRR(message, target, handler);
 +            out.writeInt(targets.size() - 1);
              while (iter.hasNext())
              {
 -                target = iter.next();
 -                MessagingService.instance().sendRR(message, target, handler);
 +                InetAddress destination = iter.next();
 +                CompactEndpointSerializationHelper.serialize(destination, out);
-                 int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel);
++                int id = MessagingService.instance().addCallback(handler,
++                                                                 message,
++                                                                 destination,
++                                                                 message.getTimeout(),
++                                                                 handler.consistencyLevel,
++                                                                 true);
 +                out.writeInt(id);
 +                logger.trace("Adding FWD message to {}@{}", id, destination);
              }
 -            return;
 +            message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
 +            // send the combined message + forward headers
 +            int id = MessagingService.instance().sendRR(message, target, handler);
 +            logger.trace("Sending message to {}@{}", id, target);
          }
 -
 -        // Add all the other destinations of the same message as a FORWARD_HEADER entry
 -        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
 -        DataOutputStream dos = new DataOutputStream(bos);
 -        dos.writeInt(targets.size() - 1);
 -        while (iter.hasNext())
 +        catch (IOException e)
          {
 -            InetAddress destination = iter.next();
 -            CompactEndpointSerializationHelper.serialize(destination, dos);
 -            String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
 -            dos.writeUTF(id);
 +            // DataOutputBuffer is in-memory, doesn't throw IOException
 +            throw new AssertionError(e);
          }
 -        message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
 -        // send the combined message + forward headers
 -        Tracing.trace("Enqueuing message to {}", target);
 -        MessagingService.instance().sendRR(message, target, handler);
      }
  
      private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)