You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/09 00:29:45 UTC

[4/5] git commit: rewrite sendRR API to fix regressions from #6132 patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6165

rewrite sendRR API to fix regressions from #6132
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6165


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/111c74ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/111c74ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/111c74ef

Branch: refs/heads/cassandra-2.0
Commit: 111c74ef94d4d75c19611ca3e77d7c54552810f0
Parents: 471c2e6
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 8 17:21:31 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 8 17:21:31 2013 -0500

----------------------------------------------------------------------
 .../apache/cassandra/net/MessagingService.java  | 28 +++++++++++++++-----
 .../service/AbstractWriteResponseHandler.java   |  2 +-
 .../apache/cassandra/service/StorageProxy.java  |  9 +++----
 3 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/111c74ef/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ca3845b..ad15dd2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -546,7 +546,7 @@ public final class MessagingService implements MessagingServiceMBean
         return messageId;
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+    public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation> message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
     {
         assert message.verb == Verb.MUTATION;
         int messageId = nextId();
@@ -562,21 +562,17 @@ public final class MessagingService implements MessagingServiceMBean
         return idGen.incrementAndGet();
     }
 
-    /*
-     * @see #sendRR(Message message, InetAddress to, IAsyncCallback cb, long timeout)
-     */
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
     {
         return sendRR(message, to, cb, message.getTimeout());
     }
 
     /**
-     * Send a message to a given endpoint. This method specifies a callback
+     * Send a non-mutation message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
      * Also holds the message (only mutation messages) to determine if it
      * needs to trigger a hint (uses StorageProxy for that).
      *
-     *
      * @param message message to be sent.
      * @param to      endpoint to which the message needs to be sent
      * @param cb      callback interface which is used to pass the responses or
@@ -592,6 +588,26 @@ public final class MessagingService implements MessagingServiceMBean
         return id;
     }
 
+    /**
+     * Send a mutation message to a given endpoint. This method specifies a callback
+     * which is invoked with the actual response.
+     * Also holds the message (only mutation messages) to determine if it
+     * needs to trigger a hint (uses StorageProxy for that).
+     *
+     * @param message message to be sent.
+     * @param to      endpoint to which the message needs to be sent
+     * @param handler callback interface which is used to pass the responses or
+     *                suggest that a timeout occurred to the invoker of the send().
+     *                suggest that a timeout occurred to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
+    public int sendRR(MessageOut<? extends IMutation> message, InetAddress to, AbstractWriteResponseHandler handler)
+    {
+        int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel);
+        sendOneWay(message, id, to);
+        return id;
+    }
+
     public void sendOneWay(MessageOut message, InetAddress to)
     {
         sendOneWay(message, nextId(), to);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/111c74ef/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 95f9ac4..9f336eb 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -38,7 +38,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
     protected final Keyspace keyspace;
     protected final long start;
     protected final Collection<InetAddress> naturalEndpoints;
-    protected final ConsistencyLevel consistencyLevel;
+    public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
     protected final Collection<InetAddress> pendingEndpoints;
     private final WriteType writeType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/111c74ef/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3def513..3c68121 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -958,7 +958,7 @@ public class StorageProxy implements StorageProxyMBean
         StorageMetrics.totalHints.inc();
     }
 
-    private static void sendMessagesToNonlocalDC(MessageOut message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
+    private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
     {
         Iterator<InetAddress> iter = targets.iterator();
         InetAddress target = iter.next();
@@ -978,8 +978,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
             // send the combined message + forward headers
-            int id = MessagingService.instance().addCallback(handler, message, target, message.getTimeout(), handler.consistencyLevel);
-            MessagingService.instance().sendOneWay(message, id, target);
+            int id = MessagingService.instance().sendRR(message, target, handler);
             logger.trace("Sending message to {}@{}", id, target);
         }
         catch (IOException e)
@@ -1039,9 +1038,7 @@ public class StorageProxy implements StorageProxyMBean
             AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
 
             Tracing.trace("Enqueuing counter update to {}", endpoint);
-            MessageOut<CounterMutation> message = cm.makeMutationMessage();
-            int id = MessagingService.instance().addCallback(responseHandler, message, endpoint, message.getTimeout(), cm.consistency());
-            MessagingService.instance().sendOneWay(message, id, endpoint);
+            MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler);
             return responseHandler;
         }
     }