You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/08/16 22:43:12 UTC

[1/2] git commit: run local range scans on the read stage patch by jbellis; reviewed by vijay for CASSANDRA-3687

Updated Branches:
  refs/heads/trunk fe784f58e -> 5577ff626


run local range scans on the read stage
patch by jbellis; reviewed by vijay for CASSANDRA-3687


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5577ff62
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5577ff62
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5577ff62

Branch: refs/heads/trunk
Commit: 5577ff626bb38d419a3540e0c0ccb1a9d8b8680f
Parents: 29fed1f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Aug 16 15:43:02 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Aug 16 15:43:02 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/service/AbstractRowResolver.java     |   11 --
 .../org/apache/cassandra/service/ReadCallback.java |   27 ++---
 .../org/apache/cassandra/service/StorageProxy.java |   91 ++++++++-------
 4 files changed, 59 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a2848d..75de54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * run local range scans on the read stage (CASSANDRA-3687)
  * clean up ioexceptions (CASSANDRA-2116)
  * Introduce new json format with row level deletion (CASSANDRA-4054)
  * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index b1647a2..beaf73c 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -51,17 +51,6 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
         replies.add(message);
     }
 
-    /** hack so local reads don't force de/serialization of an extra real Message */
-    public void injectPreProcessed(ReadResponse result)
-    {
-        MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
-                                                           result,
-                                                           Collections.<String, byte[]>emptyMap(),
-                                                           MessagingService.Verb.INTERNAL_RESPONSE,
-                                                           MessagingService.current_version);
-        replies.add(message);
-    }
-
     public Iterable<MessageIn<ReadResponse>> getMessages()
     {
         return replies;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index a3d273c..bfd0044 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -165,32 +166,20 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
     /**
      * @return true if the message counts towards the blockfor threshold
-     * TODO turn the Message into a response so we don't need two versions of this method
      */
     protected boolean waitingFor(MessageIn message)
     {
         return true;
     }
 
-    /**
-     * @return true if the response counts towards the blockfor threshold
-     */
-    protected boolean waitingFor(ReadResponse response)
+    public void response(TMessage result)
     {
-        return true;
-    }
-
-    public void response(ReadResponse result)
-    {
-        ((RowDigestResolver) resolver).injectPreProcessed(result);
-        int n = waitingFor(result)
-              ? received.incrementAndGet()
-              : received.get();
-        if (n >= blockfor && resolver.isDataPresent())
-        {
-            condition.signal();
-            maybeResolveForRepair();
-        }
+        MessageIn<TMessage> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
+                                                       result,
+                                                       Collections.<String, byte[]>emptyMap(),
+                                                       MessagingService.Verb.INTERNAL_RESPONSE,
+                                                       MessagingService.current_version);
+        response(message);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d55739..1fb84cd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -831,6 +831,30 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    static class LocalRangeSliceRunnable extends DroppableRunnable
+    {
+        private final RangeSliceCommand command;
+        private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
+        private final long start = System.currentTimeMillis();
+
+        LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+        {
+            super(MessagingService.Verb.READ);
+            this.command = command;
+            this.handler = handler;
+        }
+
+        protected void runMayThrow() throws ExecutionException, InterruptedException
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("LocalReadRunnable reading " + command);
+
+            RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
+            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
+            handler.response(result);
+        }
+    }
+
     static <TMessage, TResolved> ReadCallback<TMessage, TResolved> getReadCallback(IResponseResolver<TMessage, TResolved> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
     {
         if (consistencyLevel == ConsistencyLevel.LOCAL_QUORUM || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
@@ -868,33 +892,18 @@ public class StorageProxy implements StorageProxyMBean
                 List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
                 DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
 
-                if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
+                // collect replies and resolve according to consistency level
+                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
+                ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
+                handler.assureSufficientLiveNodes();
+                resolver.setSources(handler.endpoints);
+                if (handler.endpoints.size() == 1 && handler.endpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("local range slice");
-
-                    try
-                    {
-                        rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd));
-                        for (Row row : rows)
-                            columnsCount += row.getLiveColumnCount();
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw new RuntimeException(e.getCause());
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
+                    logger.debug("reading data locally");
+                    StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
                 }
                 else
                 {
-                    // collect replies and resolve according to consistency level
-                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
-                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
-                    handler.assureSufficientLiveNodes();
-                    resolver.setSources(handler.endpoints);
                     MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : handler.endpoints)
                     {
@@ -902,27 +911,27 @@ public class StorageProxy implements StorageProxyMBean
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + nodeCmd + " from " + endpoint);
                     }
+                }
 
-                    try
-                    {
-                        for (Row row : handler.get())
-                        {
-                            rows.add(row);
-                            columnsCount += row.getLiveColumnCount();
-                            logger.debug("range slices read {}", row.key);
-                        }
-                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
-                    }
-                    catch (TimeoutException ex)
-                    {
-                        if (logger.isDebugEnabled())
-                            logger.debug("Range slice timeout: {}", ex.toString());
-                        throw ex;
-                    }
-                    catch (DigestMismatchException e)
+                try
+                {
+                    for (Row row : handler.get())
                     {
-                        throw new AssertionError(e); // no digests in range slices yet
+                        rows.add(row);
+                        columnsCount += row.getLiveColumnCount();
+                        logger.debug("range slices read {}", row.key);
                     }
+                    FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+                }
+                catch (TimeoutException ex)
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Range slice timeout: {}", ex.toString());
+                    throw ex;
+                }
+                catch (DigestMismatchException e)
+                {
+                    throw new AssertionError(e); // no digests in range slices yet
                 }
 
                 // if we're done, great, otherwise, move to the next range