You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/07 23:47:52 UTC
git commit: fix merge from #6132 patch by jbellis;
reviewed by brandonwilliams for CASSANDRA-6154
Updated Branches:
refs/heads/trunk a57981650 -> 53f7c328a
fix merge from #6132
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6154
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53f7c328
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53f7c328
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53f7c328
Branch: refs/heads/trunk
Commit: 53f7c328ae4aca0affaed4f0c73678011a4e152a
Parents: a579816
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Oct 7 16:47:41 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Oct 7 16:47:47 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/net/MessagingService.java | 15 ++++++++++-----
.../org/apache/cassandra/service/StorageProxy.java | 10 ++++------
2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f7c328/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 1776361..ff8a2c7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -563,14 +563,16 @@ public final class MessagingService implements MessagingServiceMBean
return idGen.incrementAndGet();
}
- /*
- * @see #sendRR(Message message, InetAddress to, IAsyncCallback cb, long timeout)
- */
public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
{
return sendRR(message, to, cb, message.getTimeout());
}
+ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+ {
+ return sendRR(message, to, cb, timeout, null);
+ }
+
/**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
@@ -584,11 +586,14 @@ public final class MessagingService implements MessagingServiceMBean
* suggest that a timeout occurred to the invoker of the send().
* suggest that a timeout occurred to the invoker of the send().
* @param timeout the timeout used for expiration
+ * @param consistencyLevel the consistency level, for mutations; must be null otherwise
* @return an reference to message id used to match with the result
*/
- public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, ConsistencyLevel consistencyLevel)
{
- int id = addCallback(cb, message, to, timeout);
+ int id = consistencyLevel == null
+ ? addCallback(cb, message, to, timeout)
+ : addCallback(cb, message, to, timeout, consistencyLevel);
sendOneWay(message, id, to);
return id;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f7c328/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 157079f..102d8b5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -681,7 +681,7 @@ public class StorageProxy implements StorageProxyMBean
{
MessageOut<RowMutation> message = rm.createMessage();
for (InetAddress target : endpoints)
- MessagingService.instance().sendRR(message, target, handler);
+ MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), ConsistencyLevel.ONE);
}
}
@@ -866,7 +866,7 @@ public class StorageProxy implements StorageProxyMBean
// (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
if (localDataCenter.equals(dc))
{
- MessagingService.instance().sendRR(message, destination, responseHandler);
+ MessagingService.instance().sendRR(message, destination, responseHandler, message.getTimeout(), consistency_level);
}
else
{
@@ -982,8 +982,7 @@ public class StorageProxy implements StorageProxyMBean
}
message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
// send the combined message + forward headers
- int id = MessagingService.instance().addCallback(handler, message, target, message.getTimeout(), handler.consistencyLevel);
- MessagingService.instance().sendOneWay(message, id, target);
+ int id = MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
logger.trace("Sending message to {}@{}", id, target);
}
catch (IOException e)
@@ -1044,8 +1043,7 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Enqueuing counter update to {}", endpoint);
MessageOut<CounterMutation> message = cm.makeMutationMessage();
- int id = MessagingService.instance().addCallback(responseHandler, message, endpoint, message.getTimeout(), cm.consistency());
- MessagingService.instance().sendOneWay(message, id, endpoint);
+ MessagingService.instance().sendRR(message, endpoint, responseHandler, message.getTimeout(), cm.consistency());
return responseHandler;
}
}