You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/06/22 06:29:47 UTC

git commit: parallelize fetching rows for low-cardinality indexes patch by David Alves; reviewed by Vijay for CASSANDRA-1337

Updated Branches:
  refs/heads/trunk ecda83471 -> 9cf915fd4


parallelize fetching rows for low-cardinality indexes
patch by David Alves; reviewed by Vijay for CASSANDRA-1337


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cf915fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cf915fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cf915fd

Branch: refs/heads/trunk
Commit: 9cf915fd464b6f8a6aa9d54a762ad8796872681a
Parents: ecda834
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Jun 21 21:20:09 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Jun 21 21:20:09 2012 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/service/StorageProxy.java |   66 ++++++++++-----
 1 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cf915fd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 92a3256..4353a8f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -845,6 +845,22 @@ public class StorageProxy implements StorageProxyMBean
             int columnsCount = 0;
             rows = new ArrayList<Row>();
             List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+            
+            // get the cardinality of this index based on row count
+            // use this info to decide how many scans to do in parallel
+            long estimatedKeys = Table.open(command.keyspace).getColumnFamilyStore(command.column_family)
+                    .estimateKeys();
+            int concurrencyFactor = (int) command.maxResults / ((int) estimatedKeys + 1);
+
+            if (concurrencyFactor <= 0)
+                concurrencyFactor = 1;
+
+            if (concurrencyFactor > ranges.size())
+                concurrencyFactor = ranges.size();
+            
+            // parallel scan handlers
+            List<ReadCallback<RangeSliceReply, Iterable<Row>>> scanHandlers = new ArrayList<ReadCallback<RangeSliceReply, Iterable<Row>>>(concurrencyFactor);
+            
             for (AbstractBounds<RowPosition> range : ranges)
             {
                 RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
@@ -895,32 +911,40 @@ public class StorageProxy implements StorageProxyMBean
                             logger.debug("reading " + nodeCmd + " from " + endpoint);
                     }
 
-                    try
+                    scanHandlers.add(handler);
+                    if (scanHandlers.size() >= concurrencyFactor)
                     {
-                        for (Row row : handler.get())
+                        for (ReadCallback<RangeSliceReply, Iterable<Row>> scanHandler : scanHandlers)
                         {
-                            rows.add(row);
-                            columnsCount += row.getLiveColumnCount();
-                            logger.debug("range slices read {}", row.key);
+                            try
+                            {
+                                for (Row row : scanHandler.get())
+                                {
+                                    rows.add(row);
+                                    columnsCount += row.getLiveColumnCount();
+                                    logger.debug("range slices read {}", row.key);
+                                }
+                                FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getReadRpcTimeout());
+                            }
+                            catch (TimeoutException ex)
+                            {
+                                if (logger.isDebugEnabled())
+                                    logger.debug("Range slice timeout: {}", ex.toString());
+                                throw ex;
+                            }
+                            catch (DigestMismatchException e)
+                            {
+                                throw new AssertionError(e); // no digests in range slices yet
+                            }
+
+                            // if we're done, great, otherwise, move to the next range
+                            int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
+                            if (count >= nodeCmd.maxResults)
+                                break;
                         }
-                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
-                    }
-                    catch (TimeoutException ex)
-                    {
-                        if (logger.isDebugEnabled())
-                            logger.debug("Range slice timeout: {}", ex.toString());
-                        throw ex;
-                    }
-                    catch (DigestMismatchException e)
-                    {
-                        throw new AssertionError(e); // no digests in range slices yet
+                        scanHandlers.clear(); //go back for more
                     }
                 }
-
-                // if we're done, great, otherwise, move to the next range
-                int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
-                if (count >= nodeCmd.maxResults)
-                    break;
             }
         }
         finally