You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/22 18:47:55 UTC
[1/2] git commit: Ensure that batchlog and hint timeouts do not
produce hints
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 1c2a812a9 -> 3a73e392f
Ensure that batchlog and hint timeouts do not produce hints
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-7058
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2890cc5b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2890cc5b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2890cc5b
Branch: refs/heads/cassandra-2.0
Commit: 2890cc5be986740cadf491bb5efbb49af2b11c57
Parents: 0547d16
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Apr 22 19:10:51 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Apr 22 19:10:51 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/BatchlogManager.java | 2 +-
.../org/apache/cassandra/db/HintedHandOffManager.java | 8 +++-----
src/java/org/apache/cassandra/net/MessagingService.java | 12 +++++++++++-
4 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dc48131..74ddcfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix batchlog to account for CF truncation records (CASSANDRA-6999)
* Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
+ * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
1.2.16
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index ea32e9d..02af9d3 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -328,7 +328,7 @@ public class BatchlogManager implements BatchlogManagerMBean
}
};
WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
+ MessagingService.instance().sendUnhintableMutation(mutation, ep, handler);
handlers.add(handler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 427bbf2..a7a3e06 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -55,7 +55,6 @@ import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
-import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
import org.apache.cassandra.thrift.*;
@@ -391,8 +390,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
continue;
}
- MessageOut<RowMutation> message = rm.createMessage();
- rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
+ rateLimiter.acquire((int) RowMutation.serializer.serializedSize(rm, MessagingService.current_version));
Runnable callback = new Runnable()
{
public void run()
@@ -401,8 +399,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
}
};
- WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(message, endpoint, responseHandler);
+ WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
+ MessagingService.instance().sendUnhintableMutation(rm, endpoint, responseHandler);
responseHandlers.add(responseHandler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 09fa272..3f90d7f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -559,6 +559,17 @@ public final class MessagingService implements MessagingServiceMBean
}
/**
+ * A special version of sendRR that doesn't trigger a hint for the mutation on a timeout.
+ * Used by BatchlogManager and HintedHandOffManager.
+ */
+ public void sendUnhintableMutation(RowMutation mutation, InetAddress to, IMessageCallback cb)
+ {
+ String id = nextId();
+ callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), DatabaseDescriptor.getWriteRpcTimeout());
+ sendOneWay(mutation.createMessage(), id, to);
+ }
+
+ /**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
* Also holds the message (only mutation messages) to determine if it
@@ -568,7 +579,6 @@ public final class MessagingService implements MessagingServiceMBean
* @param to endpoint to which the message needs to be sent
* @param cb callback interface which is used to pass the responses or
* suggest that a timeout occurred to the invoker of the send().
- * suggest that a timeout occurred to the invoker of the send().
* @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/net/MessagingService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a73e392
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a73e392
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a73e392
Branch: refs/heads/cassandra-2.0
Commit: 3a73e392fa424bff5378d4bb72117cfa28f9b0b7
Parents: 1c2a812 2890cc5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Apr 22 19:47:42 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Apr 22 19:47:42 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 2 +-
.../cassandra/db/HintedHandOffManager.java | 4 +--
.../apache/cassandra/net/MessagingService.java | 27 ++++++++++++++------
.../apache/cassandra/net/WriteCallbackInfo.java | 16 +++++++++---
.../apache/cassandra/service/StorageProxy.java | 7 ++++-
6 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9b73c89,74ddcfd..dbed949
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -8,61 -9,10 +8,62 @@@ Merged from 1.2
* Fix batchlog to account for CF truncation records (CASSANDRA-6999)
* Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
+ * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
-1.2.16
+2.0.7
+ * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
+ * Avoid early loading of non-system keyspaces before compaction-leftovers
+ cleanup at startup (CASSANDRA-6913)
+ * Restrict Windows to parallel repairs (CASSANDRA-6907)
+ * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
+ * Fix NPE in MeteredFlusher (CASSANDRA-6820)
+ * Fix race processing range scan responses (CASSANDRA-6820)
+ * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821)
+ * Add uuid() function (CASSANDRA-6473)
+ * Omit tombstones from schema digests (CASSANDRA-6862)
+ * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884)
+ * Lower chances for losing new SSTables during nodetool refresh and
+ ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514)
+ * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708)
+ * Update hadoop_cql3_word_count example (CASSANDRA-6793)
+ * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
+ * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865)
+ * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
+ * Fix schema concurrency exceptions (CASSANDRA-6841)
+ * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
+ * Fix saving triggers to schema (CASSANDRA-6789)
+ * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
+ * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838)
+ * Fix static counter columns (CASSANDRA-6827)
+ * Restore expiring->deleted (cell) compaction optimization (CASSANDRA-6844)
+ * Fix CompactionManager.needsCleanup (CASSANDRA-6845)
+ * Correctly compare BooleanType values other than 0 and 1 (CASSANDRA-6779)
+ * Read message id as string from earlier versions (CASSANDRA-6840)
+ * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837)
+ * Add paranoid disk failure option (CASSANDRA-6646)
+ * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
+ * Extend triggers to support CAS updates (CASSANDRA-6882)
+ * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
+ * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
+ * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
+ * Improve MeteredFlusher handling of MF-unaffected column families
+ (CASSANDRA-6867)
+ * Add CqlRecordReader using native pagination (CASSANDRA-6311)
+ * Add QueryHandler interface (CASSANDRA-6659)
+ * Track liveRatio per-memtable, not per-CF (CASSANDRA-6945)
+ * Make sure upgradesstables keeps sstable level (CASSANDRA-6958)
+ * Fix LIMIT with static columns (CASSANDRA-6956)
+ * Fix clash with CQL column name in thrift validation (CASSANDRA-6892)
+ * Fix error with super columns in mixed 1.2-2.0 clusters (CASSANDRA-6966)
+ * Fix bad skip of sstables on slice query with composite start/finish (CASSANDRA-6825)
+ * Fix unintended update with conditional statement (CASSANDRA-6893)
+ * Fix map element access in IF (CASSANDRA-6914)
+ * Avoid costly range calculations for range queries on system keyspaces
+ (CASSANDRA-6906)
+ * Fix SSTable not released if stream session fails (CASSANDRA-6818)
+ * Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
+Merged from 1.2:
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
* add extra SSL cipher suites (CASSANDRA-6613)
* fix nodetool getsstables for blob PK (CASSANDRA-6803)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 5770994,02af9d3..5aea736
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -323,7 -328,7 +323,7 @@@ public class BatchlogManager implement
}
};
WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
- MessagingService.instance().sendUnhintableMutation(mutation, ep, handler);
++ MessagingService.instance().sendRR(mutation.createMessage(), ep, handler, false);
handlers.add(handler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 942707e,a7a3e06..13d1bb0
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -450,8 -399,8 +450,8 @@@ public class HintedHandOffManager imple
deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
}
};
- WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(message, endpoint, responseHandler);
+ WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
- MessagingService.instance().sendUnhintableMutation(rm, endpoint, responseHandler);
++ MessagingService.instance().sendRR(message, endpoint, responseHandler, false);
responseHandlers.add(responseHandler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index cc5dae5,3f90d7f..cccf698
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -537,21 -527,18 +537,33 @@@ public final class MessagingService imp
return verbHandlers.get(type);
}
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
+ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
{
- String messageId = nextId();
- CallbackInfo previous;
-
- // If HH is enabled and this is a mutation message => store the message to track for potential hints.
- if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
- else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
+ int messageId = nextId();
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
+ return messageId;
+ }
- public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation> message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
- assert previous == null;
++ public int addCallback(IAsyncCallback cb,
++ MessageOut<? extends IMutation> message,
++ InetAddress to,
++ long timeout,
++ ConsistencyLevel consistencyLevel,
++ boolean allowHints)
+ {
+ assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
+ int messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
++ CallbackInfo previous = callbacks.put(messageId,
++ new WriteCallbackInfo(to,
++ cb,
++ message,
++ callbackDeserializers.get(message.verb),
++ consistencyLevel,
++ allowHints),
++ timeout);
+ assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@@ -568,24 -559,14 +580,21 @@@
}
/**
- * A special version of sendRR that doesn't trigger a hint for the mutation on a timeout.
- * Used by BatchlogManager and HintedHandOffManager.
+ * Send a non-mutation message to a given endpoint. This method specifies a callback
+ * which is invoked with the actual response.
- * Also holds the message (only mutation messages) to determine if it
- * needs to trigger a hint (uses StorageProxy for that).
+ *
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occurred to the invoker of the send().
- * suggest that a timeout occurred to the invoker of the send().
+ * @param timeout the timeout used for expiration
+ * @return an reference to message id used to match with the result
*/
- public void sendUnhintableMutation(RowMutation mutation, InetAddress to, IMessageCallback cb)
+ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
{
- String id = nextId();
- callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), DatabaseDescriptor.getWriteRpcTimeout());
- sendOneWay(mutation.createMessage(), id, to);
+ int id = addCallback(cb, message, to, timeout);
+ sendOneWay(message, id, to);
+ return id;
}
/**
@@@ -596,14 -577,24 +605,16 @@@
*
* @param message message to be sent.
* @param to endpoint to which the message needs to be sent
- * @param cb callback interface which is used to pass the responses or
+ * @param handler callback interface which is used to pass the responses or
* suggest that a timeout occurred to the invoker of the send().
- * suggest that a timeout occurred to the invoker of the send().
- * @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
- public int sendRR(MessageOut<? extends IMutation> message, InetAddress to, AbstractWriteResponseHandler handler)
- public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
++ public int sendRR(MessageOut<? extends IMutation> message,
++ InetAddress to,
++ AbstractWriteResponseHandler handler,
++ boolean allowHints)
{
- int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel);
- String id = addCallback(cb, message, to, timeout);
-
- if (cb instanceof AbstractWriteResponseHandler)
- {
- PBSPredictor.instance().startWriteOperation(id);
- }
- else if (cb instanceof ReadCallback)
- {
- PBSPredictor.instance().startReadOperation(id);
- }
-
++ int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
sendOneWay(message, id, to);
return id;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index be7b668,0000000..987ec15
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,46 -1,0 +1,54 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.StorageProxy;
+
+public class WriteCallbackInfo extends CallbackInfo
+{
+ public final MessageOut sentMessage;
+ private final ConsistencyLevel consistencyLevel;
++ private final boolean allowHints;
+
- public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
++ public WriteCallbackInfo(InetAddress target,
++ IAsyncCallback callback,
++ MessageOut message,
++ IVersionedSerializer<?> serializer,
++ ConsistencyLevel consistencyLevel,
++ boolean allowHints)
+ {
+ super(target, callback, serializer);
+ assert message != null;
+ this.sentMessage = message;
+ this.consistencyLevel = consistencyLevel;
++ this.allowHints = allowHints;
+ }
+
+ public boolean shouldHint()
+ {
- return sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
- && consistencyLevel != ConsistencyLevel.ANY
- && StorageProxy.shouldHint(target);
++ return allowHints
++ && sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
++ && consistencyLevel != ConsistencyLevel.ANY
++ && StorageProxy.shouldHint(target);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 033ce8e,7ef3d72..fc6ee3a
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -917,29 -616,35 +917,34 @@@ public class StorageProxy implements St
Iterator<InetAddress> iter = targets.iterator();
InetAddress target = iter.next();
- // direct writes to local DC or old Cassandra versions
- if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_12)
+ // Add the other destinations of the same message as a FORWARD_HEADER entry
+ DataOutputBuffer out = new DataOutputBuffer();
+ try
{
- // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
- // creating a second iterator since we already have a perfectly good one
- MessagingService.instance().sendRR(message, target, handler);
+ out.writeInt(targets.size() - 1);
while (iter.hasNext())
{
- target = iter.next();
- MessagingService.instance().sendRR(message, target, handler);
+ InetAddress destination = iter.next();
+ CompactEndpointSerializationHelper.serialize(destination, out);
- int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel);
++ int id = MessagingService.instance().addCallback(handler,
++ message,
++ destination,
++ message.getTimeout(),
++ handler.consistencyLevel,
++ true);
+ out.writeInt(id);
+ logger.trace("Adding FWD message to {}@{}", id, destination);
}
- return;
+ message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
+ // send the combined message + forward headers
+ int id = MessagingService.instance().sendRR(message, target, handler);
+ logger.trace("Sending message to {}@{}", id, target);
}
-
- // Add all the other destinations of the same message as a FORWARD_HEADER entry
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeInt(targets.size() - 1);
- while (iter.hasNext())
+ catch (IOException e)
{
- InetAddress destination = iter.next();
- CompactEndpointSerializationHelper.serialize(destination, dos);
- String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
- dos.writeUTF(id);
+ // DataOutputBuffer is in-memory, doesn't throw IOException
+ throw new AssertionError(e);
}
- message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
- // send the combined message + forward headers
- Tracing.trace("Enqueuing message to {}", target);
- MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)