You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/20 22:59:48 UTC

[1/2] cassandra git commit: Clean up ARE sendXRequests() methods and ReadCommand#setDigestQuery()

Repository: cassandra
Updated Branches:
  refs/heads/trunk a792a7bae -> 9c6cf3cdf


Clean up ARE sendXRequests() methods and ReadCommand#setDigestQuery()

patch by Aleksey Yeschenko; reveiwed by Jonathan Ellis for
CASSANDRA-8647


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2d140ff
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2d140ff
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2d140ff

Branch: refs/heads/trunk
Commit: e2d140fff752e757f40c812dcd1b7bed3ea5fed2
Parents: 2445d4d
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Jan 21 00:52:53 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jan 21 00:54:34 2015 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadCommand.java    |  3 +-
 .../db/RetriedSliceFromReadCommand.java         |  4 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |  8 +--
 .../cassandra/db/SliceFromReadCommand.java      |  8 +--
 .../cassandra/service/AbstractReadExecutor.java | 56 +++++++++-----------
 5 files changed, 31 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 299693e..dedff6f 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -89,9 +89,10 @@ public abstract class ReadCommand implements IReadCommand, Pageable
         return isDigestQuery;
     }
 
-    public void setDigestQuery(boolean isDigestQuery)
+    public ReadCommand setIsDigestQuery(boolean isDigestQuery)
     {
         this.isDigestQuery = isDigestQuery;
+        return this;
     }
 
     public String getColumnFamilyName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index fe54917..41f5a50 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -38,9 +38,7 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand
     @Override
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount);
-        readCommand.setDigestQuery(isDigestQuery());
-        return readCommand;
+        return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount).setIsDigestQuery(isDigestQuery());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index b1829f3..22f795e 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -44,9 +44,7 @@ public class SliceByNamesReadCommand extends ReadCommand
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand= new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter);
-        readCommand.setDigestQuery(isDigestQuery());
-        return readCommand;
+        return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
     }
 
     public Row getRow(Keyspace keyspace)
@@ -97,9 +95,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         long timestamp = in.readLong();
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
         NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version);
-        ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter);
-        command.setDigestQuery(isDigest);
-        return command;
+        return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
     }
 
     public long serializedSize(ReadCommand cmd, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index f06b9dc..2259f22 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -48,9 +48,7 @@ public class SliceFromReadCommand extends ReadCommand
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new SliceFromReadCommand(ksName, key, cfName, timestamp, filter);
-        readCommand.setDigestQuery(isDigestQuery());
-        return readCommand;
+        return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
     }
 
     public Row getRow(Keyspace keyspace)
@@ -151,9 +149,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         long timestamp = in.readLong();
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
         SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
-        ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
-        command.setDigestQuery(isDigest);
-        return command;
+        return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
     }
 
     public long serializedSize(ReadCommand cmd, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2c3261f..0546e27 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,43 +77,38 @@ public abstract class AbstractReadExecutor
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)
     {
-        boolean readLocal = false;
-        for (InetAddress endpoint : endpoints)
-        {
-            if (isLocalRequest(endpoint))
-            {
-                readLocal = true;
-            }
-            else
-            {
-                logger.trace("reading data from {}", endpoint);
-                MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
-            }
-        }
-        if (readLocal)
-        {
-            logger.trace("reading data locally");
-            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
-        }
+        makeRequests(command, endpoints);
     }
 
     protected void makeDigestRequests(Iterable<InetAddress> endpoints)
     {
-        ReadCommand digestCommand = command.copy();
-        digestCommand.setDigestQuery(true);
-        MessageOut<?> message = digestCommand.createMessage();
+        makeRequests(command.copy().setIsDigestQuery(true), endpoints);
+    }
+
+    private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
+    {
+        MessageOut<ReadCommand> message = null;
+        boolean hasLocalEndpoint = false;
+
         for (InetAddress endpoint : endpoints)
         {
             if (isLocalRequest(endpoint))
             {
-                logger.trace("reading digest locally");
-                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
-            }
-            else
-            {
-                logger.trace("reading digest from {}", endpoint);
-                MessagingService.instance().sendRR(message, endpoint, handler);
+                hasLocalEndpoint = true;
+                continue;
             }
+
+            logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+            if (message == null)
+                message = readCommand.createMessage();
+            MessagingService.instance().sendRR(message, endpoint, handler);
+        }
+
+        // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
+        if (hasLocalEndpoint)
+        {
+            logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
         }
     }
 
@@ -278,10 +273,7 @@ public abstract class AbstractReadExecutor
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
                 if (resolver.getData() != null)
-                {
-                    retryCommand = command.copy();
-                    retryCommand.setDigestQuery(true);
-                }
+                    retryCommand = command.copy().setIsDigestQuery(true);
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
                 logger.trace("speculating read retry on {}", extraReplica);


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/service/AbstractReadExecutor.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c6cf3cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c6cf3cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c6cf3cd

Branch: refs/heads/trunk
Commit: 9c6cf3cdf4267b4aebb0f432d0776314f3c40c8c
Parents: a792a7b e2d140f
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Jan 21 00:59:40 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jan 21 00:59:40 2015 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadCommand.java    |  3 +-
 .../db/RetriedSliceFromReadCommand.java         |  4 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |  8 +--
 .../cassandra/db/SliceFromReadCommand.java      |  8 +--
 .../cassandra/service/AbstractReadExecutor.java | 56 +++++++++-----------
 5 files changed, 31 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6cf3cd/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index d76a2cc,0546e27..ec96d81
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@@ -107,14 -94,21 +95,21 @@@ public abstract class AbstractReadExecu
          {
              if (isLocalRequest(endpoint))
              {
-                 logger.trace("reading digest locally");
-                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
-             }
-             else
-             {
-                 logger.trace("reading digest from {}", endpoint);
-                 MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+                 hasLocalEndpoint = true;
+                 continue;
              }
+ 
+             logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+             if (message == null)
+                 message = readCommand.createMessage();
 -            MessagingService.instance().sendRR(message, endpoint, handler);
++            MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+         }
+ 
+         // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
+         if (hasLocalEndpoint)
+         {
+             logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
          }
      }