You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2009/11/14 22:05:57 UTC

svn commit: r836264 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Author: goffinet
Date: Sat Nov 14 21:05:56 2009
New Revision: 836264

URL: http://svn.apache.org/viewvc?rev=836264&view=rev
Log:
Fixed multi-get to put localCommands onto the StorageService.readStage when a node gets commands to process, instead of running them serially. 
patch by goffinet, minor cleanup and review by jbellis for CASSANDRA-555

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=836264&r1=836263&r2=836264&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat Nov 14 21:05:56 2009
@@ -21,6 +21,8 @@
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import java.lang.management.ManagementFactory;
 
 import org.apache.commons.lang.StringUtils;
@@ -37,6 +39,7 @@
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.concurrent.StageManager;
 
 import org.apache.log4j.Logger;
 
@@ -253,7 +256,7 @@
     private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException
     {
         if (logger.isDebugEnabled())
-            logger.debug("weakreadlocal reading " + StringUtils.join(commands, ", "));
+            logger.debug("weakreadremote reading " + StringUtils.join(commands, ", "));
 
         List<Row> rows = new ArrayList<Row>();
         List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
@@ -449,29 +452,26 @@
     private static List<Row> weakReadLocal(List<ReadCommand> commands) throws IOException
     {
         List<Row> rows = new ArrayList<Row>();
+        List<Future<Object>> futures = new ArrayList<Future<Object>>();
+
         for (ReadCommand command: commands)
         {
-            List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
-            /* Remove the local storage endpoint from the list. */
-            endpoints.remove(FBUtilities.getLocalAddress());
-            // TODO: throw a thrift exception if we do not have N nodes
-
-            if (logger.isDebugEnabled())
-                logger.debug("weakreadlocal reading " + command);
-
-            Table table = Table.open(command.table);
-            Row row = command.getRow(table);
-            if (row != null)
-                rows.add(row);
-            /*
-            * Do the consistency checks in the background and return the
-            * non NULL row.
-            */
-            if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, command);
-
+            Callable<Object> callable = new weakReadLocalCallable(command);
+            futures.add(StageManager.getStage(StorageService.readStage_).execute(callable));
+        }
+        for (Future<Object> future : futures)
+        {
+            Row row;
+            try
+            {
+                row = (Row) future.get();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            rows.add(row);
         }
-
         return rows;
     }
 
@@ -608,4 +608,34 @@
     {
         return writeStats.size();
     }
+
+    static class weakReadLocalCallable implements Callable<Object>
+    {
+        private ReadCommand command;
+
+        weakReadLocalCallable(ReadCommand command)
+        {
+            this.command = command;
+        }
+
+        public Object call() throws IOException
+        {
+            List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
+            /* Remove the local storage endpoint from the list. */
+            endpoints.remove(FBUtilities.getLocalAddress());
+            // TODO: throw a thrift exception if we do not have N nodes
+
+            if (logger.isDebugEnabled())
+                logger.debug("weakreadlocal reading " + command);
+
+            Table table = Table.open(command.table);
+            Row row = command.getRow(table);
+
+            // Do the consistency checks in the background and return the non NULL row
+            if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+                StorageService.instance().doConsistencyCheck(row, endpoints, command);
+
+            return row;
+        }
+    }
 }