You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/06/22 06:29:47 UTC
git commit: parallelize fetching rows for low-cardinality indexes
patch by David Alves; reviewed by Vijay for CASSANDRA-1337
Updated Branches:
refs/heads/trunk ecda83471 -> 9cf915fd4
parallelize fetching rows for low-cardinality indexes
patch by David Alves; reviewed by Vijay for CASSANDRA-1337
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cf915fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cf915fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cf915fd
Branch: refs/heads/trunk
Commit: 9cf915fd464b6f8a6aa9d54a762ad8796872681a
Parents: ecda834
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Jun 21 21:20:09 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Jun 21 21:20:09 2012 -0700
----------------------------------------------------------------------
.../org/apache/cassandra/service/StorageProxy.java | 66 ++++++++++-----
1 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cf915fd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 92a3256..4353a8f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -845,6 +845,22 @@ public class StorageProxy implements StorageProxyMBean
int columnsCount = 0;
rows = new ArrayList<Row>();
List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+
+ // get the cardinality of this index based on row count
+ // use this info to decide how many scans to do in parallel
+ long estimatedKeys = Table.open(command.keyspace).getColumnFamilyStore(command.column_family)
+ .estimateKeys();
+ int concurrencyFactor = (int) command.maxResults / ((int) estimatedKeys + 1);
+
+ if (concurrencyFactor <= 0)
+ concurrencyFactor = 1;
+
+ if (concurrencyFactor > ranges.size())
+ concurrencyFactor = ranges.size();
+
+ // parallel scan handlers
+ List<ReadCallback<RangeSliceReply, Iterable<Row>>> scanHandlers = new ArrayList<ReadCallback<RangeSliceReply, Iterable<Row>>>(concurrencyFactor);
+
for (AbstractBounds<RowPosition> range : ranges)
{
RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
@@ -895,32 +911,40 @@ public class StorageProxy implements StorageProxyMBean
logger.debug("reading " + nodeCmd + " from " + endpoint);
}
- try
+ scanHandlers.add(handler);
+ if (scanHandlers.size() >= concurrencyFactor)
{
- for (Row row : handler.get())
+ for (ReadCallback<RangeSliceReply, Iterable<Row>> scanHandler : scanHandlers)
{
- rows.add(row);
- columnsCount += row.getLiveColumnCount();
- logger.debug("range slices read {}", row.key);
+ try
+ {
+ for (Row row : scanHandler.get())
+ {
+ rows.add(row);
+ columnsCount += row.getLiveColumnCount();
+ logger.debug("range slices read {}", row.key);
+ }
+ FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getReadRpcTimeout());
+ }
+ catch (TimeoutException ex)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Range slice timeout: {}", ex.toString());
+ throw ex;
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // no digests in range slices yet
+ }
+
+ // if we're done, great, otherwise, move to the next range
+ int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
+ if (count >= nodeCmd.maxResults)
+ break;
}
- FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
- }
- catch (TimeoutException ex)
- {
- if (logger.isDebugEnabled())
- logger.debug("Range slice timeout: {}", ex.toString());
- throw ex;
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices yet
+ scanHandlers.clear(); //go back for more
}
}
-
- // if we're done, great, otherwise, move to the next range
- int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
- if (count >= nodeCmd.maxResults)
- break;
}
}
finally