You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/09 00:29:44 UTC
[3/5] git commit: rewrite sendRR API to fix regressions from #6132
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6165
rewrite sendRR API to fix regressions from #6132
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6165
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/111c74ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/111c74ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/111c74ef
Branch: refs/heads/trunk
Commit: 111c74ef94d4d75c19611ca3e77d7c54552810f0
Parents: 471c2e6
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 8 17:21:31 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 8 17:21:31 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/net/MessagingService.java | 28 +++++++++++++++-----
.../service/AbstractWriteResponseHandler.java | 2 +-
.../apache/cassandra/service/StorageProxy.java | 9 +++----
3 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/111c74ef/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ca3845b..ad15dd2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -546,7 +546,7 @@ public final class MessagingService implements MessagingServiceMBean
return messageId;
}
- public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+ public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation> message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
{
assert message.verb == Verb.MUTATION;
int messageId = nextId();
@@ -562,21 +562,17 @@ public final class MessagingService implements MessagingServiceMBean
return idGen.incrementAndGet();
}
- /*
- * @see #sendRR(Message message, InetAddress to, IAsyncCallback cb, long timeout)
- */
public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
{
return sendRR(message, to, cb, message.getTimeout());
}
/**
- * Send a message to a given endpoint. This method specifies a callback
+ * Send a non-mutation message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
* Also holds the message (only mutation messages) to determine if it
* needs to trigger a hint (uses StorageProxy for that).
*
- *
* @param message message to be sent.
* @param to endpoint to which the message needs to be sent
* @param cb callback interface which is used to pass the responses or
@@ -592,6 +588,26 @@ public final class MessagingService implements MessagingServiceMBean
return id;
}
+ /**
+ * Send a mutation message to a given endpoint. This method specifies a callback
+ * which is invoked with the actual response.
+ * Also holds the message (only mutation messages) to determine if it
+ * needs to trigger a hint (uses StorageProxy for that).
+ *
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @param handler callback interface which is used to pass the responses or
+ * suggest that a timeout occurred to the invoker of the send().
+ * suggest that a timeout occurred to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
+ public int sendRR(MessageOut<? extends IMutation> message, InetAddress to, AbstractWriteResponseHandler handler)
+ {
+ int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel);
+ sendOneWay(message, id, to);
+ return id;
+ }
+
public void sendOneWay(MessageOut message, InetAddress to)
{
sendOneWay(message, nextId(), to);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/111c74ef/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 95f9ac4..9f336eb 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -38,7 +38,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
protected final Keyspace keyspace;
protected final long start;
protected final Collection<InetAddress> naturalEndpoints;
- protected final ConsistencyLevel consistencyLevel;
+ public final ConsistencyLevel consistencyLevel;
protected final Runnable callback;
protected final Collection<InetAddress> pendingEndpoints;
private final WriteType writeType;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/111c74ef/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3def513..3c68121 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -958,7 +958,7 @@ public class StorageProxy implements StorageProxyMBean
StorageMetrics.totalHints.inc();
}
- private static void sendMessagesToNonlocalDC(MessageOut message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
+ private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
{
Iterator<InetAddress> iter = targets.iterator();
InetAddress target = iter.next();
@@ -978,8 +978,7 @@ public class StorageProxy implements StorageProxyMBean
}
message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
// send the combined message + forward headers
- int id = MessagingService.instance().addCallback(handler, message, target, message.getTimeout(), handler.consistencyLevel);
- MessagingService.instance().sendOneWay(message, id, target);
+ int id = MessagingService.instance().sendRR(message, target, handler);
logger.trace("Sending message to {}@{}", id, target);
}
catch (IOException e)
@@ -1039,9 +1038,7 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
Tracing.trace("Enqueuing counter update to {}", endpoint);
- MessageOut<CounterMutation> message = cm.makeMutationMessage();
- int id = MessagingService.instance().addCallback(responseHandler, message, endpoint, message.getTimeout(), cm.consistency());
- MessagingService.instance().sendOneWay(message, id, endpoint);
+ MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
return responseHandler;
}
}