You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/09 18:22:45 UTC
[1/3] cassandra git commit: Fix DISTINCT queries w/ limits/paging and
tombstoned partitions
Repository: cassandra
Updated Branches:
refs/heads/trunk d1a552dd7 -> 1657b4fbf
Fix DISTINCT queries w/ limits/paging and tombstoned partitions
Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8490
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd62f7bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd62f7bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd62f7bf
Branch: refs/heads/trunk
Commit: dd62f7bf7977dd40eedb1c81ab7900b778f84540
Parents: ed54e80
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 9 11:14:54 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 11:14:54 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/cql3/statements/SelectStatement.java | 6 +++++-
.../org/apache/cassandra/db/AbstractRangeCommand.java | 13 +++++++++++++
.../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +++-
src/java/org/apache/cassandra/db/DataRange.java | 12 ++++++++++++
.../org/apache/cassandra/db/filter/ExtendedFilter.java | 6 ++++++
.../apache/cassandra/db/filter/SliceQueryFilter.java | 6 ++++++
.../org/apache/cassandra/service/StorageProxy.java | 13 +++++++------
8 files changed, 54 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index adb374a..0c7e9a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.12:
+ * Fix DISTINCT queries with LIMITs or paging when some partitions
+ contain only tombstones (CASSANDRA-8490)
* Introduce background cache refreshing to permissions cache
(CASSANDRA-8194)
* Fix race condition in StreamTransferTask that could lead to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f08f6b8..19615b6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -450,7 +450,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// For distinct, we only care about fetching the beginning of each partition. If we don't have
// static columns, we in fact only care about the first cell, so we query only that (we don't "group").
// If we do have static columns, we do need to fetch the first full group (to have the static columns values).
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selectsStaticColumns ? toGroup : -1);
+
+ // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for
+ // DISTINCT queries on the partition key only.
+ toGroup = selectsStaticColumns ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+ return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup);
}
else if (isColumnRange())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
index 45302e2..4ddcb8d 100644
--- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -57,6 +57,19 @@ public abstract class AbstractRangeCommand implements IReadCommand
public abstract int limit();
public abstract boolean countCQL3Rows();
+
+ /**
+ * Returns true if tombstoned partitions should not be included in results or count towards the limit.
+ * See CASSANDRA-8490 for more details on why this is needed (and done this way).
+ * */
+ public boolean ignoredTombstonedPartitions()
+ {
+ if (!(predicate instanceof SliceQueryFilter))
+ return false;
+
+ return ((SliceQueryFilter) predicate).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+ }
+
public abstract List<Row> executeLocally();
public long getTimeout()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7bd2a59..e936473 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1749,6 +1749,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
List<Row> rows = new ArrayList<Row>();
int columnsCount = 0;
int total = 0, matched = 0;
+ boolean ignoreTombstonedPartitions = filter.ignoreTombstonedPartitions();
try
{
@@ -1784,7 +1785,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
rows.add(new Row(rawRow.key, data));
- matched++;
+ if (!ignoreTombstonedPartitions || !data.hasOnlyTombstones(filter.timestamp))
+ matched++;
if (data != null)
columnsCount += filter.lastCounted(data);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index b8b8daf..774a3aa 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -87,6 +87,18 @@ public class DataRange
return keyRange.right;
}
+ /**
+ * Returns true if tombstoned partitions should not be included in results or count towards the limit.
+ * See CASSANDRA-8490 for more details on why this is needed (and done this way).
+ * */
+ public boolean ignoredTombstonedPartitions()
+ {
+ if (!(columnFilter instanceof SliceQueryFilter))
+ return false;
+
+ return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+ }
+
// Whether the bounds of this DataRange actually wraps around.
public boolean isWrapAround()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 5c3662b..82e889d 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -127,6 +127,12 @@ public abstract class ExtendedFilter
*/
public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
+ /** Returns true if tombstoned partitions should not be included in results or count towards the limit, false otherwise. */
+ public boolean ignoreTombstonedPartitions()
+ {
+ return dataRange.ignoredTombstonedPartitions();
+ }
+
/**
* @return true if the provided data satisfies all the expressions from
* the clause of this filter.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 58a0303..858578f 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -42,6 +42,12 @@ public class SliceQueryFilter implements IDiskAtomFilter
private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
public static final Serializer serializer = new Serializer();
+ /**
+ * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
+ * or count towards the limit. See CASSANDRA-8490 for more details on why this is needed (and done this way).
+ **/
+ public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
+
public final ColumnSlice[] slices;
public final boolean reversed;
public volatile int count;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1e1a2a3..45af1c8 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1499,7 +1499,8 @@ public class StorageProxy implements StorageProxyMBean
// now scan until we have enough results
try
{
- int cql3RowCount = 0;
+ int liveRowCount = 0;
+ boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
rows = new ArrayList<>();
// when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
@@ -1594,8 +1595,8 @@ public class StorageProxy implements StorageProxyMBean
for (Row row : handler.get())
{
rows.add(row);
- if (nodeCmd.countCQL3Rows())
- cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
+ if (countLiveRows)
+ liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
@@ -1636,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean
}
// if we're done, great, otherwise, move to the next range
- int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
+ int count = countLiveRows ? liveRowCount : rows.size();
if (count >= nodeCmd.limit())
break;
}
@@ -1652,8 +1653,8 @@ public class StorageProxy implements StorageProxyMBean
private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
{
- // When maxIsColumns, we let the caller trim the result.
- if (command.countCQL3Rows())
+ // for CQL3 queries, let the caller trim the results
+ if (command.countCQL3Rows() || command.ignoredTombstonedPartitions())
return rows;
else
return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ty...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1657b4fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1657b4fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1657b4fb
Branch: refs/heads/trunk
Commit: 1657b4fbf9d7eae1b7a1d829de882d2a86ae14c8
Parents: d1a552d 7f62e29
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 9 11:22:33 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 11:22:33 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/cql3/statements/SelectStatement.java | 6 +++++-
.../apache/cassandra/db/AbstractRangeCommand.java | 13 +++++++++++++
.../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +++-
src/java/org/apache/cassandra/db/DataRange.java | 12 ++++++++++++
.../apache/cassandra/db/filter/ExtendedFilter.java | 6 ++++++
.../apache/cassandra/db/filter/SliceQueryFilter.java | 6 ++++++
.../org/apache/cassandra/service/StorageProxy.java | 15 ++++++++-------
8 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1657b4fb/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1657b4fb/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f06055a,92a9579..de8e004
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -348,12 -432,16 +348,16 @@@ public class SelectStatement implement
// For distinct, we only care about fetching the beginning of each partition. If we don't have
// static columns, we in fact only care about the first cell, so we query only that (we don't "group").
// If we do have static columns, we do need to fetch the first full group (to have the static columns values).
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selection.containsStaticColumns() ? toGroup : -1);
+
+ // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for
+ // DISTINCT queries on the partition key only.
- toGroup = selectsStaticColumns ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
++ toGroup = selection.containsStaticColumns() ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+ return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup);
}
- else if (isColumnRange())
+ else if (restrictions.isColumnRange())
{
- List<Composite> startBounds = getRequestedBound(Bound.START, options);
- List<Composite> endBounds = getRequestedBound(Bound.END, options);
+ List<Composite> startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options);
+ List<Composite> endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options);
assert startBounds.size() == endBounds.size();
// Handles fetching static columns. Note that for 2i, the filter is just used to restrict
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1657b4fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1657b4fb/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1657b4fb/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1657b4fb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by ty...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/service/StorageProxy.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f62e292
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f62e292
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f62e292
Branch: refs/heads/trunk
Commit: 7f62e292867bb6159592bfc8b0423f89f518a2b5
Parents: 14b2d7a dd62f7b
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 9 11:19:37 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 11:19:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/cql3/statements/SelectStatement.java | 6 +++++-
.../apache/cassandra/db/AbstractRangeCommand.java | 13 +++++++++++++
.../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +++-
src/java/org/apache/cassandra/db/DataRange.java | 12 ++++++++++++
.../apache/cassandra/db/filter/ExtendedFilter.java | 6 ++++++
.../apache/cassandra/db/filter/SliceQueryFilter.java | 6 ++++++
.../org/apache/cassandra/service/StorageProxy.java | 15 ++++++++-------
8 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2028633,0c7e9a2..abe3fce
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,56 -1,6 +1,58 @@@
-2.0.12:
+2.1.3
+ * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
+ * Fix case-sensitivity of index name on CREATE and DROP INDEX
+ statements (CASSANDRA-8365)
+ * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
+ * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
+ * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
+ * Properly calculate expected write size during compaction (CASSANDRA-8532)
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
+ * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
+ * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
+ is disabled (CASSANDRA-8288)
+ * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
+ * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
+ * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
+ * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
+ * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
+ * Disable mmap on Windows (CASSANDRA-6993)
+ * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
+ * Add auth support to cassandra-stress (CASSANDRA-7985)
+ * Fix ArrayIndexOutOfBoundsException when generating error message
+ for some CQL syntax errors (CASSANDRA-8455)
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
+Merged from 2.0:
+ * Fix DISTINCT queries with LIMITs or paging when some partitions
+ contain only tombstones (CASSANDRA-8490)
* Introduce background cache refreshing to permissions cache
(CASSANDRA-8194)
* Fix race condition in StreamTransferTask that could lead to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 487c6f0,858578f..7508c4e
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@@ -44,7 -40,14 +44,13 @@@ import org.apache.cassandra.tracing.Tra
public class SliceQueryFilter implements IDiskAtomFilter
{
private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
- public static final Serializer serializer = new Serializer();
+ /**
+ * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
+ * or count towards the limit. See CASSANDRA-8490 for more details on why this is needed (and done this way).
+ **/
+ public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
+
public final ColumnSlice[] slices;
public final boolean reversed;
public volatile int count;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 780df21,45af1c8..72577a6
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1631,141 -1517,109 +1632,141 @@@ public class StorageProxy implements St
List<InetAddress> nextFilteredEndpoints = null;
while (i < ranges.size())
{
- AbstractBounds<RowPosition> range = nextRange == null
- ? ranges.get(i)
- : nextRange;
- List<InetAddress> liveEndpoints = nextEndpoints == null
- ? getLiveSortedEndpoints(keyspace, range.right)
- : nextEndpoints;
- List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
- ? consistency_level.filterForQuery(keyspace, liveEndpoints)
- : nextFilteredEndpoints;
- ++i;
-
- // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
- // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
- // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
- while (i < ranges.size())
+ List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
+ int concurrentFetchStartingIndex = i;
+ int concurrentRequests = 0;
+ while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
{
- nextRange = ranges.get(i);
- nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
- nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
-
- /*
- * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
- * don't know how to deal with a wrapping range.
- * Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
- * the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
- * wire compatibility, so It's likely easier not to bother;
- */
- if (range.right.isMinimum())
- break;
-
- List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
-
- // Check if there is enough endpoint for the merge to be possible.
- if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
- break;
-
- List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
-
- // Estimate whether merging will be a win or not
- if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
- break;
-
- // If we get there, merge this range and the next one
- range = range.withNewRight(nextRange.right);
- liveEndpoints = merged;
- filteredEndpoints = filteredMerged;
+ AbstractBounds<RowPosition> range = nextRange == null
+ ? ranges.get(i)
+ : nextRange;
+ List<InetAddress> liveEndpoints = nextEndpoints == null
+ ? getLiveSortedEndpoints(keyspace, range.right)
+ : nextEndpoints;
+ List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
+ ? consistency_level.filterForQuery(keyspace, liveEndpoints)
+ : nextFilteredEndpoints;
++i;
- }
+ ++concurrentRequests;
- AbstractRangeCommand nodeCmd = command.forSubRange(range);
-
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
- List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
- handler.assureSufficientLiveNodes();
- resolver.setSources(filteredEndpoints);
- if (filteredEndpoints.size() == 1
- && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
- && OPTIMIZE_LOCAL_REQUESTS)
- {
- StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
- }
- else
- {
- MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
- for (InetAddress endpoint : filteredEndpoints)
+ // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
+ // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
+ // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
+ while (i < ranges.size())
{
- Tracing.trace("Enqueuing request to {}", endpoint);
- MessagingService.instance().sendRR(message, endpoint, handler);
+ nextRange = ranges.get(i);
+ nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
+ nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
+
+ // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
+ // don't know how to deal with a wrapping range.
+ // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
+ // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
+ // wire compatibility, so It's likely easier not to bother;
+ if (range.right.isMinimum())
+ break;
+
+ List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
+
+ // Check if there is enough endpoint for the merge to be possible.
+ if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
+ break;
+
+ List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
+
+ // Estimate whether merging will be a win or not
+ if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
+ break;
+
+ // If we get there, merge this range and the next one
+ range = range.withNewRight(nextRange.right);
+ liveEndpoints = merged;
+ filteredEndpoints = filteredMerged;
+ ++i;
}
- }
- try
- {
- for (Row row : handler.get())
+ AbstractRangeCommand nodeCmd = command.forSubRange(range);
+
+ // collect replies and resolve according to consistency level
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
+ List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
+ ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
+ handler.assureSufficientLiveNodes();
+ resolver.setSources(filteredEndpoints);
+ if (filteredEndpoints.size() == 1
+ && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
+ && OPTIMIZE_LOCAL_REQUESTS)
{
- rows.add(row);
- if (countLiveRows)
- liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
+ StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
}
- FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+ else
+ {
+ MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
+ for (InetAddress endpoint : filteredEndpoints)
+ {
+ Tracing.trace("Enqueuing request to {}", endpoint);
+ MessagingService.instance().sendRR(message, endpoint, handler);
+ }
+ }
+ scanHandlers.add(Pair.create(nodeCmd, handler));
}
- catch (ReadTimeoutException ex)
+ Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
+
+ List<AsyncOneResponse> repairResponses = new ArrayList<>();
+ for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
{
- // we timed out waiting for responses
- int blockFor = consistency_level.blockFor(keyspace);
- int responseCount = resolver.responses.size();
- String gotData = responseCount > 0
- ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
- : "";
+ AbstractRangeCommand nodeCmd = cmdPairHandler.left;
+ ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
+ RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
- if (Tracing.isTracing())
+ try
{
- Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
- new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
+ for (Row row : handler.get())
+ {
+ rows.add(row);
- if (nodeCmd.countCQL3Rows())
- cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
++ if (countLiveRows)
++ liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
+ }
+ repairResponses.addAll(resolver.repairResults);
}
- else if (logger.isDebugEnabled())
+ catch (ReadTimeoutException ex)
{
- logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
- responseCount, blockFor, gotData, i, ranges.size());
+ // we timed out waiting for responses
+ int blockFor = consistency_level.blockFor(keyspace);
+ int responseCount = resolver.responses.size();
+ String gotData = responseCount > 0
+ ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
+ : "";
+
+ if (Tracing.isTracing())
+ {
+ Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
+ new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
+ }
+ else if (logger.isDebugEnabled())
+ {
+ logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
+ responseCount, blockFor, gotData, i, ranges.size());
+ }
+ throw ex;
}
- throw ex;
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // no digests in range slices yet
+ }
+
+ // if we're done, great, otherwise, move to the next range
- int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
++ int count = countLiveRows ? liveRowCount : rows.size();
+ if (count >= nodeCmd.limit())
+ {
+ haveSufficientRows = true;
+ break;
+ }
+ }
+
+ try
+ {
+ FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
}
catch (TimeoutException ex)
{
@@@ -1777,31 -1631,15 +1778,31 @@@
logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
}
- catch (DigestMismatchException e)
+
+ if (haveSufficientRows)
+ return trim(command, rows);
+
+ // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
+ // based on the results we've seen so far (as long as we still have ranges left to query)
+ if (i < ranges.size())
{
- float fetchedRows = command.countCQL3Rows() ? cql3RowCount : rows.size();
- throw new AssertionError(e); // no digests in range slices yet
++ float fetchedRows = countLiveRows ? liveRowCount : rows.size();
+ float remainingRows = command.limit() - fetchedRows;
+ float actualRowsPerRange;
+ if (fetchedRows == 0.0)
+ {
+ // we haven't actually gotten any results, so query all remaining ranges at once
+ actualRowsPerRange = 0.0f;
+ concurrencyFactor = ranges.size() - i;
+ }
+ else
+ {
+ actualRowsPerRange = i / fetchedRows;
+ concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
+ }
+ logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+ actualRowsPerRange, (int) remainingRows, concurrencyFactor);
}
-
- // if we're done, great, otherwise, move to the next range
- int count = countLiveRows ? liveRowCount : rows.size();
- if (count >= nodeCmd.limit())
- break;
}
}
finally