You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/16 10:52:18 UTC
[01/15] cassandra git commit: Don't do defragmentation if doing reads
from repaired sstables.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 561000aa3 -> 0c53c3a9e
refs/heads/cassandra-2.2 a48b836b1 -> 5182376b9
refs/heads/cassandra-3.0 ec0092b7c -> dcc57d0bb
refs/heads/cassandra-3.5 caad32750 -> fc6b2b3d4
refs/heads/trunk 3d4aa594f -> df58c1f4d
Don't do defragmentation if doing reads from repaired sstables.
Patch by marcuse; reviewed by Sylvain Lebresne for CASSANDRA-10342
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c53c3a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c53c3a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c53c3a9
Branch: refs/heads/cassandra-2.1
Commit: 0c53c3a9e5bcb04b951ddec0f22eca894dc3013f
Parents: 561000a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 16 07:52:19 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:44:19 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b505f8..04c9204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.14
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 5c6a3db..8a966bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -108,7 +108,7 @@ public class CollationController
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
+ boolean onlyUnrepaired = true;
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
@@ -122,7 +122,8 @@ public class CollationController
reduceNameFilter(reducedFilter, container, currentMaxTs);
if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
break;
-
+ if (sstable.isRepaired())
+ onlyUnrepaired = false;
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
@@ -149,6 +150,7 @@ public class CollationController
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+ && onlyUnrepaired
&& !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy().shouldDefragment())
{
[06/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5182376b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5182376b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5182376b
Branch: refs/heads/trunk
Commit: 5182376b9a6eb3f51c5c1ee2ab63076723660798
Parents: a48b836 0c53c3a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:47:31 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:47:31 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a69164e,04c9204..dca4f8a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
-2.1.14
+2.2.6
+ * Fix bloom filter sizing with LCS (CASSANDRA-11344)
+ * (cqlsh) Fix error when result is 0 rows with EXPAND ON (CASSANDRA-11092)
+ * Fix intra-node serialization issue for multicolumn-restrictions (CASSANDRA-11196)
+ * Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads (CASSANDRA-11301)
+ * Add missing newline at end of bin/cqlsh (CASSANDRA-11325)
+ * Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297)
+ * Unresolved hostname leads to replace being ignored (CASSANDRA-11210)
+ * Fix filtering on non-primary key columns for thrift static column families
+ (CASSANDRA-6377)
+ * Only log yaml config once, at startup (CASSANDRA-11217)
+ * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
+ * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
+ * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
+ * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)
+ * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037)
+ * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793)
+ * Protect from keyspace dropped during repair (CASSANDRA-11065)
+ * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146)
+ * Better error message for cleanup (CASSANDRA-10991)
+ * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123)
+ * Use cloned TokenMetadata in size estimates to avoid race against membership check
+ (CASSANDRA-10736)
+ * Always persist upsampled index summaries (CASSANDRA-10512)
+ * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
+ * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
+ * Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281)
+ * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
+ * Fix paging on DISTINCT queries repeats result when first row in partition changes
+ (CASSANDRA-10010)
+Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
[07/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5182376b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5182376b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5182376b
Branch: refs/heads/cassandra-2.2
Commit: 5182376b9a6eb3f51c5c1ee2ab63076723660798
Parents: a48b836 0c53c3a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:47:31 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:47:31 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a69164e,04c9204..dca4f8a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
-2.1.14
+2.2.6
+ * Fix bloom filter sizing with LCS (CASSANDRA-11344)
+ * (cqlsh) Fix error when result is 0 rows with EXPAND ON (CASSANDRA-11092)
+ * Fix intra-node serialization issue for multicolumn-restrictions (CASSANDRA-11196)
+ * Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads (CASSANDRA-11301)
+ * Add missing newline at end of bin/cqlsh (CASSANDRA-11325)
+ * Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297)
+ * Unresolved hostname leads to replace being ignored (CASSANDRA-11210)
+ * Fix filtering on non-primary key columns for thrift static column families
+ (CASSANDRA-6377)
+ * Only log yaml config once, at startup (CASSANDRA-11217)
+ * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
+ * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
+ * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
+ * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)
+ * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037)
+ * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793)
+ * Protect from keyspace dropped during repair (CASSANDRA-11065)
+ * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146)
+ * Better error message for cleanup (CASSANDRA-10991)
+ * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123)
+ * Use cloned TokenMetadata in size estimates to avoid race against membership check
+ (CASSANDRA-10736)
+ * Always persist upsampled index summaries (CASSANDRA-10512)
+ * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
+ * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
+ * Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281)
+ * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
+ * Fix paging on DISTINCT queries repeats result when first row in partition changes
+ (CASSANDRA-10010)
+Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
[15/15] cassandra git commit: Merge branch 'cassandra-3.5' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-3.5' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/df58c1f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/df58c1f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/df58c1f4
Branch: refs/heads/trunk
Commit: df58c1f4d1cac989bc6af3390db0126d4e463365
Parents: 3d4aa59 fc6b2b3
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:49:48 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:49:48 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/df58c1f4/CHANGES.txt
----------------------------------------------------------------------
[04/15] cassandra git commit: Don't do defragmentation if doing reads
from repaired sstables.
Posted by ma...@apache.org.
Don't do defragmentation if doing reads from repaired sstables.
Patch by marcuse; reviewed by Sylvain Lebresne for CASSANDRA-10342
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c53c3a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c53c3a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c53c3a9
Branch: refs/heads/cassandra-3.0
Commit: 0c53c3a9e5bcb04b951ddec0f22eca894dc3013f
Parents: 561000a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 16 07:52:19 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:44:19 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b505f8..04c9204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.14
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 5c6a3db..8a966bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -108,7 +108,7 @@ public class CollationController
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
+ boolean onlyUnrepaired = true;
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
@@ -122,7 +122,8 @@ public class CollationController
reduceNameFilter(reducedFilter, container, currentMaxTs);
if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
break;
-
+ if (sstable.isRepaired())
+ onlyUnrepaired = false;
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
@@ -149,6 +150,7 @@ public class CollationController
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+ && onlyUnrepaired
&& !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy().shouldDefragment())
{
[05/15] cassandra git commit: Don't do defragmentation if doing reads
from repaired sstables.
Posted by ma...@apache.org.
Don't do defragmentation if doing reads from repaired sstables.
Patch by marcuse; reviewed by Sylvain Lebresne for CASSANDRA-10342
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c53c3a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c53c3a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c53c3a9
Branch: refs/heads/cassandra-3.5
Commit: 0c53c3a9e5bcb04b951ddec0f22eca894dc3013f
Parents: 561000a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 16 07:52:19 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:44:19 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b505f8..04c9204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.14
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 5c6a3db..8a966bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -108,7 +108,7 @@ public class CollationController
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
+ boolean onlyUnrepaired = true;
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
@@ -122,7 +122,8 @@ public class CollationController
reduceNameFilter(reducedFilter, container, currentMaxTs);
if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
break;
-
+ if (sstable.isRepaired())
+ onlyUnrepaired = false;
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
@@ -149,6 +150,7 @@ public class CollationController
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+ && onlyUnrepaired
&& !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy().shouldDefragment())
{
[02/15] cassandra git commit: Don't do defragmentation if doing reads
from repaired sstables.
Posted by ma...@apache.org.
Don't do defragmentation if doing reads from repaired sstables.
Patch by marcuse; reviewed by Sylvain Lebresne for CASSANDRA-10342
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c53c3a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c53c3a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c53c3a9
Branch: refs/heads/cassandra-2.2
Commit: 0c53c3a9e5bcb04b951ddec0f22eca894dc3013f
Parents: 561000a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 16 07:52:19 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:44:19 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b505f8..04c9204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.14
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 5c6a3db..8a966bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -108,7 +108,7 @@ public class CollationController
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
+ boolean onlyUnrepaired = true;
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
@@ -122,7 +122,8 @@ public class CollationController
reduceNameFilter(reducedFilter, container, currentMaxTs);
if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
break;
-
+ if (sstable.isRepaired())
+ onlyUnrepaired = false;
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
@@ -149,6 +150,7 @@ public class CollationController
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+ && onlyUnrepaired
&& !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy().shouldDefragment())
{
[03/15] cassandra git commit: Don't do defragmentation if doing reads
from repaired sstables.
Posted by ma...@apache.org.
Don't do defragmentation if doing reads from repaired sstables.
Patch by marcuse; reviewed by Sylvain Lebresne for CASSANDRA-10342
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c53c3a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c53c3a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c53c3a9
Branch: refs/heads/trunk
Commit: 0c53c3a9e5bcb04b951ddec0f22eca894dc3013f
Parents: 561000a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 16 07:52:19 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:44:19 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b505f8..04c9204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.14
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c53c3a9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 5c6a3db..8a966bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -108,7 +108,7 @@ public class CollationController
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
+ boolean onlyUnrepaired = true;
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
@@ -122,7 +122,8 @@ public class CollationController
reduceNameFilter(reducedFilter, container, currentMaxTs);
if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
break;
-
+ if (sstable.isRepaired())
+ onlyUnrepaired = false;
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
@@ -149,6 +150,7 @@ public class CollationController
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+ && onlyUnrepaired
&& !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy().shouldDefragment())
{
[13/15] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.5
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc6b2b3d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc6b2b3d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc6b2b3d
Branch: refs/heads/cassandra-3.5
Commit: fc6b2b3d4bc18406e9d93bd0bf0a0f4080cf93f2
Parents: caad327 dcc57d0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:49:20 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:49:20 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc6b2b3d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a71d4c6,87691f9..7acb24d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -91,9 -58,9 +91,10 @@@ Merged from 2.2
* (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
* Fix paging on DISTINCT queries repeats result when first row in partition changes
(CASSANDRA-10010)
+ * (cqlsh) Support timezone conversion using pytz (CASSANDRA-10397)
* cqlsh: change default encoding to UTF-8 (CASSANDRA-11124)
Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* Don't remove FailureDetector history on removeEndpoint (CASSANDRA-10371)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc6b2b3d/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
[08/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5182376b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5182376b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5182376b
Branch: refs/heads/cassandra-3.0
Commit: 5182376b9a6eb3f51c5c1ee2ab63076723660798
Parents: a48b836 0c53c3a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:47:31 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:47:31 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a69164e,04c9204..dca4f8a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
-2.1.14
+2.2.6
+ * Fix bloom filter sizing with LCS (CASSANDRA-11344)
+ * (cqlsh) Fix error when result is 0 rows with EXPAND ON (CASSANDRA-11092)
+ * Fix intra-node serialization issue for multicolumn-restrictions (CASSANDRA-11196)
+ * Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads (CASSANDRA-11301)
+ * Add missing newline at end of bin/cqlsh (CASSANDRA-11325)
+ * Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297)
+ * Unresolved hostname leads to replace being ignored (CASSANDRA-11210)
+ * Fix filtering on non-primary key columns for thrift static column families
+ (CASSANDRA-6377)
+ * Only log yaml config once, at startup (CASSANDRA-11217)
+ * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
+ * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
+ * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
+ * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)
+ * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037)
+ * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793)
+ * Protect from keyspace dropped during repair (CASSANDRA-11065)
+ * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146)
+ * Better error message for cleanup (CASSANDRA-10991)
+ * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123)
+ * Use cloned TokenMetadata in size estimates to avoid race against membership check
+ (CASSANDRA-10736)
+ * Always persist upsampled index summaries (CASSANDRA-10512)
+ * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
+ * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
+ * Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281)
+ * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
+ * Fix paging on DISTINCT queries repeats result when first row in partition changes
+ (CASSANDRA-10010)
+Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
[11/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc57d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc57d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc57d0b
Branch: refs/heads/cassandra-3.5
Commit: dcc57d0bb2761f0b71f6064f4830af9fa140d0cf
Parents: ec0092b 5182376
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:48:30 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:48:30 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 126039a,dca4f8a..87691f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -58,12 -29,15 +58,13 @@@ Merged from 2.2
* (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
* Fix paging on DISTINCT queries repeats result when first row in partition changes
(CASSANDRA-10010)
+ * cqlsh: change default encoding to UTF-8 (CASSANDRA-11124)
Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
- * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
- * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
* Don't remove FailureDetector history on removeEndpoint (CASSANDRA-10371)
* Only notify if repair status changed (CASSANDRA-11172)
- * Add partition key to TombstoneOverwhelmingException error message (CASSANDRA-10888)
* Use logback setting for 'cassandra -v' command (CASSANDRA-10767)
* Fix sstableloader to unthrottle streaming by default (CASSANDRA-9714)
* Fix incorrect warning in 'nodetool status' (CASSANDRA-10176)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index de4c9c7,0000000..14923b9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -1,988 -1,0 +1,991 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+
+/**
+ * A read command that selects a (part of a) single partition.
+ */
+public class SinglePartitionReadCommand extends ReadCommand
+{
+ protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+ private final DecoratedKey partitionKey;
+ private final ClusteringIndexFilter clusteringIndexFilter;
+
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+ public SinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ assert partitionKey.getPartitioner() == metadata.partitioner;
+ this.partitionKey = partitionKey;
+ this.clusteringIndexFilter = clusteringIndexFilter;
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition for thrift.
+ *
+ * @param isForThrift whether the query is for thrift or not.
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param columnFilter the column filter to use for the query.
+ * @param filter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command. The returned command will use no row filter and have no limits.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
+ {
+ return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
+ {
+ return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
+ {
+ return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided single slice.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slice the slice of rows to query.
+ *
+ * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+ {
+ return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+ {
+ ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+ return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
+ {
+ return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+ }
+
+ public SinglePartitionReadCommand copy()
+ {
+ return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter()
+ {
+ return clusteringIndexFilter;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+ {
+ return clusteringIndexFilter;
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getReadRpcTimeout();
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ if (!this.partitionKey().equals(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!clusteringIndexFilter().selects(clustering))
+ return false;
+
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ /**
+ * Returns a new command suitable to paging from the last returned row.
+ *
+ * @param lastReturned the last row returned by the previous page. The newly created command
+ * will only query row that comes after this (in query order). This can be {@code null} if this
+ * is the first page.
+ * @param pageSize the size to use for the page to query.
+ *
+ * @return the newly create command.
+ */
+ public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize)
+ {
+ // We shouldn't have set digest yet when reaching that point
+ assert !isDigestQuery();
+ return create(isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits().forPaging(pageSize),
+ partitionKey(),
+ lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(Group.one(this), consistency, clientState);
+ }
+
+ public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ return getPager(this, pagingState, protocolVersion);
+ }
+
+ private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
+ {
+ return new SinglePartitionPager(command, pagingState, protocolVersion);
+ }
+
+ protected void recordLatency(TableMetrics metric, long latencyNanos)
+ {
+ metric.readLatency.addNano(latencyNanos);
+ }
+
+ @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
+ protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ {
+ UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+ ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
+ : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
+ return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
+ }
+
+ /**
+ * Fetch the rows requested if in cache; if not, read it from disk and cache it.
+ * <p>
+ * If the partition is cached, and the filter given is within its bounds, we return
+ * from cache, otherwise from disk.
+ * <p>
+ * If the partition is is not cached, we figure out what filter is "biggest", read
+ * that from disk, then filter the result and either cache that or return it.
+ */
+ private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ assert !cfs.isIndex(); // CASSANDRA-5732
+ assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
+
+ RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey());
+
+ // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
+ // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
+ // TODO: don't evict entire partitions on writes (#2864)
+ IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+ if (cached != null)
+ {
+ if (cached instanceof RowCacheSentinel)
+ {
+ // Some other read is trying to cache the value, just do a normal non-caching read
+ Tracing.trace("Row cache miss (race)");
+ cfs.metric.rowCacheMiss.inc();
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ CachedPartition cachedPartition = (CachedPartition)cached;
+ if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
+ {
+ cfs.metric.rowCacheHit.inc();
+ Tracing.trace("Row cache hit");
+ UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
+ cfs.metric.updateSSTableIterated(0);
+ return unfilteredRowIterator;
+ }
+
+ cfs.metric.rowCacheHitOutOfRange.inc();
+ Tracing.trace("Ignoring row cache as cached value could not satisfy query");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ cfs.metric.rowCacheMiss.inc();
+ Tracing.trace("Row cache miss");
+
+ boolean cacheFullPartitions = metadata().params.caching.cacheAllRows();
+
+ // To be able to cache what we read, what we read must at least covers what the cache holds, that
+ // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows
+ // systematically, but we'd have to "extend" that to whatever is needed for the user query that the
+ // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently
+ // we settle for caching what we read only if the user query does query the head of the partition since
+ // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache
+ // full partitions, in which case we just always read it all and cache.
+ if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
+ {
+ RowCacheSentinel sentinel = new RowCacheSentinel();
+ boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
+ boolean sentinelReplaced = false;
+
+ try
+ {
+ int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+ @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
+ UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+ try
+ {
+ // We want to cache only rowsToCache rows
+ CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec());
+ if (sentinelSuccess && !toCache.isEmpty())
+ {
+ Tracing.trace("Caching {} rows", toCache.rowCount());
+ CacheService.instance.rowCache.replace(key, sentinel, toCache);
+ // Whether or not the previous replace has worked, our sentinel is not in the cache anymore
+ sentinelReplaced = true;
+ }
+
+ // We then re-filter out what this query wants.
+ // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more
+ // than what we've cached, so we can't just use toCache.
+ UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
+ if (cacheFullPartitions)
+ {
+ // Everything is guaranteed to be in 'toCache', we're done with 'iter'
+ assert !iter.hasNext();
+ iter.close();
+ return cacheIterator;
+ }
+ return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
+ }
+ catch (RuntimeException | Error e)
+ {
+ iter.close();
+ throw e;
+ }
+ }
+ finally
+ {
+ if (sentinelSuccess && !sentinelReplaced)
+ cfs.invalidateCachedPartition(key);
+ }
+ }
+
+ Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ /**
+ * Queries both memtable and sstables to fetch the result of this query.
+ * <p>
+ * Please note that this method:
+ * 1) does not check the row cache.
+ * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes).
+ * Those are applied in {@link ReadCommand#executeLocally}.
+ * 3) does not record some of the read metrics (latency, scanned cells histograms) nor
+ * throws TombstoneOverwhelmingException.
+ * It is publicly exposed because there is a few places where that is exactly what we want,
+ * but it should be used only where you know you don't need thoses things.
+ * <p>
+ * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is
+ * to enforce that that it is required as parameter, even though it's not explicitlly used by the method.
+ */
+ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ Tracing.trace("Executing single-partition query on {}", cfs.name);
+
+ boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
+ return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+ }
+
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
+ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+ {
+ /*
+ * We have 2 main strategies:
+ * 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
+ * unless we have a names filter that we know we can optimize futher.
+ * 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
+ * will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
+ * have a way to guarantee we have all the data for what is queried, which is only possible for name queries
+ * and if we have neither collections nor counters (indeed, for a collection, we can't guarantee an older sstable
+ * won't have some elements that weren't in the most recent sstables, and counters are intrinsically a collection
+ * of shards so have the same problem).
+ */
+ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
+ return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ ClusteringIndexFilter filter = clusteringIndexFilter();
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+ @SuppressWarnings("resource") // same as above
+ UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+ }
+ /*
+ * We can't eliminate full sstables based on the timestamp of what we've already read like
+ * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+ * we've read. We still rely on the sstable ordering by maxTimestamp since if
+ * maxTimestamp_s1 > maxTimestamp_s0,
+ * we're guaranteed that s1 cannot have a row tombstone such that
+ * timestamp(tombstone) > maxTimestamp_s0
+ * since we necessarily have
+ * timestamp(tombstone) <= maxTimestamp_s1
+ * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+ * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+ */
+ int sstablesIterated = 0;
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ List<SSTableReader> skippedSSTables = null;
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ long minTimestamp = Long.MAX_VALUE;
+ int nonIntersectingSSTables = 0;
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we can skip it
+ if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+ {
+ if (skippedSSTables == null)
+ skippedSSTables = new ArrayList<>();
+ skippedSSTables.add(sstable);
+ }
+ continue;
+ }
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+ sstablesIterated++;
+ }
+
+ int includedDueToTombstones = 0;
+ // Check for partition tombstones in the skipped sstables
+ if (skippedSSTables != null)
+ {
+ for (SSTableReader sstable : skippedSSTables)
+ {
+ if (sstable.getMaxTimestamp() <= minTimestamp)
+ continue;
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+ {
+ iterators.add(iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ includedDueToTombstones++;
+ sstablesIterated++;
+ }
+ else
+ {
+ iter.close();
+ }
+ }
+ }
+ if (Tracing.isTracing())
+ Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (iterators.isEmpty())
+ return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
+
+ Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
+
+ @SuppressWarnings("resource") // Closed through the closing of the result of that method.
+ UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+ if (!merged.isEmpty())
+ {
+ DecoratedKey key = merged.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+ }
+
+ return merged;
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+
+ private boolean shouldInclude(SSTableReader sstable)
+ {
+ // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable
+ // don't tell us if the sstable contains static values in particular.
+ // TODO: we could record if a sstable contains any static value at all.
+ if (!columnFilter().fetchedColumns().statics.isEmpty())
+ return true;
+
+ return clusteringIndexFilter().shouldInclude(sstable);
+ }
+
+ private boolean queryNeitherCountersNorCollections()
+ {
+ for (ColumnDefinition column : columnFilter().fetchedColumns())
+ {
+ if (column.type.isCollection() || column.type.isCounter())
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable
+ * max timestamp.
+ *
+ * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing
+ * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if
+ * no collection or counters are included).
+ * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
+ */
+ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+ {
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ ImmutableBTreePartition result = null;
+
+ Tracing.trace("Merging memtable contents");
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
+ {
+ if (iter.isEmpty())
+ continue;
+
+ UnfilteredRowIterator clonedFilter = copyOnHeap
+ ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
+ : iter;
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+ }
+ }
+
+ /* add the SSTables on disk */
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ int sstablesIterated = 0;
-
++ boolean onlyUnrepaired = true;
+ // read sorted sstables
+ for (SSTableReader sstable : view.sstables)
+ {
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we're done, since the rest of the sstables
+ // will also be older
+ if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
+ break;
+
+ long currentMaxTs = sstable.getMaxTimestamp();
+ filter = reduceFilter(filter, result, currentMaxTs);
+ if (filter == null)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion
+ // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
+ // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
+ // has any tombstone at all as a shortcut.
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE)
+ continue; // Means no tombstone at all, we can skip that sstable
+
+ // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
+ sstable.incrementReadCount();
+ try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))
+ {
+ if (iter.partitionLevelDeletion().isLive())
+ {
+ sstablesIterated++;
+ result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired());
+ }
+ }
+ continue;
+ }
+
+ Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
+ sstable.incrementReadCount();
+ try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())))
+ {
+ if (iter.isEmpty())
+ continue;
+
++ if (sstable.isRepaired())
++ onlyUnrepaired = false;
+ sstablesIterated++;
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired());
+ }
+ }
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (result == null || result.isEmpty())
+ return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
+
+ DecoratedKey key = result.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+
+ // "hoist up" the requested data into a more recent sstable
+ if (sstablesIterated > cfs.getMinimumCompactionThreshold()
++ && onlyUnrepaired
+ && !cfs.isAutoCompactionDisabled()
+ && cfs.getCompactionStrategyManager().shouldDefragment())
+ {
+ // !!WARNING!! if we stop copying our data to a heap-managed object,
+ // we will need to track the lifetime of this mutation as well
+ Tracing.trace("Defragmenting requested data");
+
+ try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
+ {
+ final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
+ StageManager.getStage(Stage.MUTATION).execute(new Runnable()
+ {
+ public void run()
+ {
+ // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
+ }
+ });
+ }
+ }
+
+ return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
+ }
+
+ private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
+ {
+ if (!isRepaired)
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
+
+ int maxRows = Math.max(filter.requestedRows().size(), 1);
+ if (result == null)
+ return ImmutableBTreePartition.create(iter, maxRows);
+
+ try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec()))
+ {
+ return ImmutableBTreePartition.create(merged, maxRows);
+ }
+ }
+
+ private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
+ {
+ if (result == null)
+ return filter;
+
+ SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
+
+ PartitionColumns columns = columnFilter().fetchedColumns();
+ NavigableSet<Clustering> clusterings = filter.requestedRows();
+
+ // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
+ // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
+ // that for later.
+
+ boolean removeStatic = false;
+ if (!columns.statics.isEmpty())
+ {
+ Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+ removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
+ }
+
+ NavigableSet<Clustering> toRemove = null;
+ for (Clustering clustering : clusterings)
+ {
+ if (!searchIter.hasNext())
+ break;
+
+ Row row = searchIter.next(clustering);
+ if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
+ continue;
+
+ if (toRemove == null)
+ toRemove = new TreeSet<>(result.metadata().comparator);
+ toRemove.add(clustering);
+ }
+
+ if (!removeStatic && toRemove == null)
+ return filter;
+
+ // Check if we have everything we need
+ boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
+ boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
+ if (hasNoMoreStatic && hasNoMoreClusterings)
+ return null;
+
+ if (toRemove != null)
+ {
+ BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
+ newClusterings.addAll(Sets.difference(clusterings, toRemove));
+ clusterings = newClusterings.build();
+ }
+ return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+ }
+
+ private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
+ {
+ // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
+ // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
+ // and 2) the requested columns.
+ if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+ return false;
+
+ for (ColumnDefinition column : requestedColumns)
+ {
+ Cell cell = row.getCell(column);
+ if (cell == null || cell.timestamp() <= sstableTimestamp)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)",
+ metadata().ksName,
+ metadata().cfName,
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ metadata().getKeyValidator().getString(partitionKey().getKey()),
+ clusteringIndexFilter.toString(metadata()),
+ nowInSec());
+ }
+
+ public MessageOut<ReadCommand> createMessage(int version)
+ {
+ return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
+ }
+
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ sb.append(" WHERE ");
+
+ sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = ");
+ DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey());
+
+ // We put the row filter first because the clustering index filter can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ sb.append(" AND ").append(rowFilter());
+
+ String filterString = clusteringIndexFilter().toCQLString(metadata());
+ if (!filterString.isEmpty())
+ sb.append(" AND ").append(filterString);
+ }
+
+ protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+ {
+ metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
+ ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version);
+ }
+
+ protected long selectionSerializedSize(int version)
+ {
+ return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
+ + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
+ }
+
+ /**
+ * Groups multiple single partition read commands.
+ */
+ public static class Group implements ReadQuery
+ {
+ public final List<SinglePartitionReadCommand> commands;
+ private final DataLimits limits;
+ private final int nowInSec;
+
+ public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ {
+ assert !commands.isEmpty();
+ this.commands = commands;
+ this.limits = limits;
+ this.nowInSec = commands.get(0).nowInSec();
+ for (int i = 1; i < commands.size(); i++)
+ assert commands.get(i).nowInSec() == nowInSec;
+ }
+
+ public static Group one(SinglePartitionReadCommand command)
+ {
+ return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(this, consistency, clientState);
+ }
+
+ public int nowInSec()
+ {
+ return nowInSec;
+ }
+
+ public DataLimits limits()
+ {
+ return limits;
+ }
+
+ public CFMetaData metadata()
+ {
+ return commands.get(0).metadata();
+ }
+
+ public ReadOrderGroup startOrderGroup()
+ {
+ // Note that the only difference between the command in a group must be the partition key on which
+ // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
+ return commands.get(0).startOrderGroup();
+ }
+
+ public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+ {
+ List<PartitionIterator> partitions = new ArrayList<>(commands.size());
+ for (SinglePartitionReadCommand cmd : commands)
+ partitions.add(cmd.executeInternal(orderGroup));
+
+ // Because we only have enforce the limit per command, we need to enforce it globally.
+ return limits.filter(PartitionIterators.concat(partitions), nowInSec);
+ }
+
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ if (commands.size() == 1)
+ return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
+
+ return new MultiPartitionPager(this, pagingState, protocolVersion);
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return Iterables.any(commands, c -> c.selectsKey(key));
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
+ }
+
+ @Override
+ public String toString()
+ {
+ return commands.toString();
+ }
+ }
+
+ private static class Deserializer extends SelectionDeserializer
+ {
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+ throws IOException
+ {
+ DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
+ ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
+ }
+ }
+}
[09/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5182376b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5182376b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5182376b
Branch: refs/heads/cassandra-3.5
Commit: 5182376b9a6eb3f51c5c1ee2ab63076723660798
Parents: a48b836 0c53c3a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:47:31 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:47:31 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/CollationController.java | 6 ++++--
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a69164e,04c9204..dca4f8a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
-2.1.14
+2.2.6
+ * Fix bloom filter sizing with LCS (CASSANDRA-11344)
+ * (cqlsh) Fix error when result is 0 rows with EXPAND ON (CASSANDRA-11092)
+ * Fix intra-node serialization issue for multicolumn-restrictions (CASSANDRA-11196)
+ * Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads (CASSANDRA-11301)
+ * Add missing newline at end of bin/cqlsh (CASSANDRA-11325)
+ * Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297)
+ * Unresolved hostname leads to replace being ignored (CASSANDRA-11210)
+ * Fix filtering on non-primary key columns for thrift static column families
+ (CASSANDRA-6377)
+ * Only log yaml config once, at startup (CASSANDRA-11217)
+ * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
+ * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
+ * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
+ * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)
+ * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037)
+ * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793)
+ * Protect from keyspace dropped during repair (CASSANDRA-11065)
+ * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146)
+ * Better error message for cleanup (CASSANDRA-10991)
+ * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123)
+ * Use cloned TokenMetadata in size estimates to avoid race against membership check
+ (CASSANDRA-10736)
+ * Always persist upsampled index summaries (CASSANDRA-10512)
+ * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
+ * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
+ * Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281)
+ * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
+ * Fix paging on DISTINCT queries repeats result when first row in partition changes
+ (CASSANDRA-10010)
+Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5182376b/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
[12/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc57d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc57d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc57d0b
Branch: refs/heads/cassandra-3.0
Commit: dcc57d0bb2761f0b71f6064f4830af9fa140d0cf
Parents: ec0092b 5182376
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:48:30 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:48:30 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 126039a,dca4f8a..87691f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -58,12 -29,15 +58,13 @@@ Merged from 2.2
* (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
* Fix paging on DISTINCT queries repeats result when first row in partition changes
(CASSANDRA-10010)
+ * cqlsh: change default encoding to UTF-8 (CASSANDRA-11124)
Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
- * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
- * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
* Don't remove FailureDetector history on removeEndpoint (CASSANDRA-10371)
* Only notify if repair status changed (CASSANDRA-11172)
- * Add partition key to TombstoneOverwhelmingException error message (CASSANDRA-10888)
* Use logback setting for 'cassandra -v' command (CASSANDRA-10767)
* Fix sstableloader to unthrottle streaming by default (CASSANDRA-9714)
* Fix incorrect warning in 'nodetool status' (CASSANDRA-10176)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index de4c9c7,0000000..14923b9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -1,988 -1,0 +1,991 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+
+/**
+ * A read command that selects a (part of a) single partition.
+ */
+public class SinglePartitionReadCommand extends ReadCommand
+{
+ protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+ private final DecoratedKey partitionKey;
+ private final ClusteringIndexFilter clusteringIndexFilter;
+
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+ public SinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ assert partitionKey.getPartitioner() == metadata.partitioner;
+ this.partitionKey = partitionKey;
+ this.clusteringIndexFilter = clusteringIndexFilter;
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition for thrift.
+ *
+ * @param isForThrift whether the query is for thrift or not.
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param columnFilter the column filter to use for the query.
+ * @param filter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command. The returned command will use no row filter and have no limits.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
+ {
+ return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
+ {
+ return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
+ {
+ return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided single slice.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slice the slice of rows to query.
+ *
+ * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+ {
+ return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+ {
+ ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+ return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
+ {
+ return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+ }
+
+ public SinglePartitionReadCommand copy()
+ {
+ return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter()
+ {
+ return clusteringIndexFilter;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+ {
+ return clusteringIndexFilter;
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getReadRpcTimeout();
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ if (!this.partitionKey().equals(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!clusteringIndexFilter().selects(clustering))
+ return false;
+
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ /**
+ * Returns a new command suitable to paging from the last returned row.
+ *
+ * @param lastReturned the last row returned by the previous page. The newly created command
+ * will only query row that comes after this (in query order). This can be {@code null} if this
+ * is the first page.
+ * @param pageSize the size to use for the page to query.
+ *
+ * @return the newly create command.
+ */
+ public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize)
+ {
+ // We shouldn't have set digest yet when reaching that point
+ assert !isDigestQuery();
+ return create(isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits().forPaging(pageSize),
+ partitionKey(),
+ lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(Group.one(this), consistency, clientState);
+ }
+
+ public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ return getPager(this, pagingState, protocolVersion);
+ }
+
+ private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
+ {
+ return new SinglePartitionPager(command, pagingState, protocolVersion);
+ }
+
+ protected void recordLatency(TableMetrics metric, long latencyNanos)
+ {
+ metric.readLatency.addNano(latencyNanos);
+ }
+
+ @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
+ protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ {
+ UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+ ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
+ : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
+ return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
+ }
+
+ /**
+ * Fetch the rows requested if in cache; if not, read it from disk and cache it.
+ * <p>
+ * If the partition is cached, and the filter given is within its bounds, we return
+ * from cache, otherwise from disk.
+ * <p>
+ * If the partition is is not cached, we figure out what filter is "biggest", read
+ * that from disk, then filter the result and either cache that or return it.
+ */
+ private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ assert !cfs.isIndex(); // CASSANDRA-5732
+ assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
+
+ RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey());
+
+ // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
+ // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
+ // TODO: don't evict entire partitions on writes (#2864)
+ IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+ if (cached != null)
+ {
+ if (cached instanceof RowCacheSentinel)
+ {
+ // Some other read is trying to cache the value, just do a normal non-caching read
+ Tracing.trace("Row cache miss (race)");
+ cfs.metric.rowCacheMiss.inc();
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ CachedPartition cachedPartition = (CachedPartition)cached;
+ if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
+ {
+ cfs.metric.rowCacheHit.inc();
+ Tracing.trace("Row cache hit");
+ UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
+ cfs.metric.updateSSTableIterated(0);
+ return unfilteredRowIterator;
+ }
+
+ cfs.metric.rowCacheHitOutOfRange.inc();
+ Tracing.trace("Ignoring row cache as cached value could not satisfy query");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ cfs.metric.rowCacheMiss.inc();
+ Tracing.trace("Row cache miss");
+
+ boolean cacheFullPartitions = metadata().params.caching.cacheAllRows();
+
+ // To be able to cache what we read, what we read must at least covers what the cache holds, that
+ // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows
+ // systematically, but we'd have to "extend" that to whatever is needed for the user query that the
+ // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently
+ // we settle for caching what we read only if the user query does query the head of the partition since
+ // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache
+ // full partitions, in which case we just always read it all and cache.
+ if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
+ {
+ RowCacheSentinel sentinel = new RowCacheSentinel();
+ boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
+ boolean sentinelReplaced = false;
+
+ try
+ {
+ int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+ @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
+ UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+ try
+ {
+ // We want to cache only rowsToCache rows
+ CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec());
+ if (sentinelSuccess && !toCache.isEmpty())
+ {
+ Tracing.trace("Caching {} rows", toCache.rowCount());
+ CacheService.instance.rowCache.replace(key, sentinel, toCache);
+ // Whether or not the previous replace has worked, our sentinel is not in the cache anymore
+ sentinelReplaced = true;
+ }
+
+ // We then re-filter out what this query wants.
+ // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more
+ // than what we've cached, so we can't just use toCache.
+ UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
+ if (cacheFullPartitions)
+ {
+ // Everything is guaranteed to be in 'toCache', we're done with 'iter'
+ assert !iter.hasNext();
+ iter.close();
+ return cacheIterator;
+ }
+ return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
+ }
+ catch (RuntimeException | Error e)
+ {
+ iter.close();
+ throw e;
+ }
+ }
+ finally
+ {
+ if (sentinelSuccess && !sentinelReplaced)
+ cfs.invalidateCachedPartition(key);
+ }
+ }
+
+ Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ /**
+ * Queries both memtable and sstables to fetch the result of this query.
+ * <p>
+ * Please note that this method:
+ * 1) does not check the row cache.
+ * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes).
+ * Those are applied in {@link ReadCommand#executeLocally}.
+ * 3) does not record some of the read metrics (latency, scanned cells histograms) nor
+ * throws TombstoneOverwhelmingException.
+ * It is publicly exposed because there is a few places where that is exactly what we want,
+ * but it should be used only where you know you don't need thoses things.
+ * <p>
+ * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is
+ * to enforce that that it is required as parameter, even though it's not explicitlly used by the method.
+ */
+ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ Tracing.trace("Executing single-partition query on {}", cfs.name);
+
+ boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
+ return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+ }
+
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
+ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+ {
+ /*
+ * We have 2 main strategies:
+ * 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
+ * unless we have a names filter that we know we can optimize futher.
+ * 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
+ * will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
+ * have a way to guarantee we have all the data for what is queried, which is only possible for name queries
+ * and if we have neither collections nor counters (indeed, for a collection, we can't guarantee an older sstable
+ * won't have some elements that weren't in the most recent sstables, and counters are intrinsically a collection
+ * of shards so have the same problem).
+ */
+ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
+ return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ ClusteringIndexFilter filter = clusteringIndexFilter();
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+ @SuppressWarnings("resource") // same as above
+ UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+ }
+ /*
+ * We can't eliminate full sstables based on the timestamp of what we've already read like
+ * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+ * we've read. We still rely on the sstable ordering by maxTimestamp since if
+ * maxTimestamp_s1 > maxTimestamp_s0,
+ * we're guaranteed that s1 cannot have a row tombstone such that
+ * timestamp(tombstone) > maxTimestamp_s0
+ * since we necessarily have
+ * timestamp(tombstone) <= maxTimestamp_s1
+ * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+ * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+ */
+ int sstablesIterated = 0;
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ List<SSTableReader> skippedSSTables = null;
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ long minTimestamp = Long.MAX_VALUE;
+ int nonIntersectingSSTables = 0;
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we can skip it
+ if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+ {
+ if (skippedSSTables == null)
+ skippedSSTables = new ArrayList<>();
+ skippedSSTables.add(sstable);
+ }
+ continue;
+ }
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+ sstablesIterated++;
+ }
+
+ int includedDueToTombstones = 0;
+ // Check for partition tombstones in the skipped sstables
+ if (skippedSSTables != null)
+ {
+ for (SSTableReader sstable : skippedSSTables)
+ {
+ if (sstable.getMaxTimestamp() <= minTimestamp)
+ continue;
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+ {
+ iterators.add(iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ includedDueToTombstones++;
+ sstablesIterated++;
+ }
+ else
+ {
+ iter.close();
+ }
+ }
+ }
+ if (Tracing.isTracing())
+ Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (iterators.isEmpty())
+ return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
+
+ Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
+
+ @SuppressWarnings("resource") // Closed through the closing of the result of that method.
+ UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+ if (!merged.isEmpty())
+ {
+ DecoratedKey key = merged.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+ }
+
+ return merged;
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+
+ private boolean shouldInclude(SSTableReader sstable)
+ {
+ // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable
+ // don't tell us if the sstable contains static values in particular.
+ // TODO: we could record if a sstable contains any static value at all.
+ if (!columnFilter().fetchedColumns().statics.isEmpty())
+ return true;
+
+ return clusteringIndexFilter().shouldInclude(sstable);
+ }
+
+ private boolean queryNeitherCountersNorCollections()
+ {
+ for (ColumnDefinition column : columnFilter().fetchedColumns())
+ {
+ if (column.type.isCollection() || column.type.isCounter())
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable
+ * max timestamp.
+ *
+ * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing
+ * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if
+ * no collection or counters are included).
+ * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
+ */
+ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+ {
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ ImmutableBTreePartition result = null;
+
+ Tracing.trace("Merging memtable contents");
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
+ {
+ if (iter.isEmpty())
+ continue;
+
+ UnfilteredRowIterator clonedFilter = copyOnHeap
+ ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
+ : iter;
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+ }
+ }
+
+ /* add the SSTables on disk */
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ int sstablesIterated = 0;
-
++ boolean onlyUnrepaired = true;
+ // read sorted sstables
+ for (SSTableReader sstable : view.sstables)
+ {
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we're done, since the rest of the sstables
+ // will also be older
+ if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
+ break;
+
+ long currentMaxTs = sstable.getMaxTimestamp();
+ filter = reduceFilter(filter, result, currentMaxTs);
+ if (filter == null)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion
+ // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
+ // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
+ // has any tombstone at all as a shortcut.
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE)
+ continue; // Means no tombstone at all, we can skip that sstable
+
+ // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
+ sstable.incrementReadCount();
+ try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))
+ {
+ if (iter.partitionLevelDeletion().isLive())
+ {
+ sstablesIterated++;
+ result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired());
+ }
+ }
+ continue;
+ }
+
+ Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
+ sstable.incrementReadCount();
+ try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())))
+ {
+ if (iter.isEmpty())
+ continue;
+
++ if (sstable.isRepaired())
++ onlyUnrepaired = false;
+ sstablesIterated++;
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired());
+ }
+ }
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (result == null || result.isEmpty())
+ return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
+
+ DecoratedKey key = result.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+
+ // "hoist up" the requested data into a more recent sstable
+ if (sstablesIterated > cfs.getMinimumCompactionThreshold()
++ && onlyUnrepaired
+ && !cfs.isAutoCompactionDisabled()
+ && cfs.getCompactionStrategyManager().shouldDefragment())
+ {
+ // !!WARNING!! if we stop copying our data to a heap-managed object,
+ // we will need to track the lifetime of this mutation as well
+ Tracing.trace("Defragmenting requested data");
+
+ try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
+ {
+ final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
+ StageManager.getStage(Stage.MUTATION).execute(new Runnable()
+ {
+ public void run()
+ {
+ // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
+ }
+ });
+ }
+ }
+
+ return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
+ }
+
+ private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
+ {
+ if (!isRepaired)
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
+
+ int maxRows = Math.max(filter.requestedRows().size(), 1);
+ if (result == null)
+ return ImmutableBTreePartition.create(iter, maxRows);
+
+ try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec()))
+ {
+ return ImmutableBTreePartition.create(merged, maxRows);
+ }
+ }
+
+ private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
+ {
+ if (result == null)
+ return filter;
+
+ SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
+
+ PartitionColumns columns = columnFilter().fetchedColumns();
+ NavigableSet<Clustering> clusterings = filter.requestedRows();
+
+ // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
+ // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
+ // that for later.
+
+ boolean removeStatic = false;
+ if (!columns.statics.isEmpty())
+ {
+ Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+ removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
+ }
+
+ NavigableSet<Clustering> toRemove = null;
+ for (Clustering clustering : clusterings)
+ {
+ if (!searchIter.hasNext())
+ break;
+
+ Row row = searchIter.next(clustering);
+ if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
+ continue;
+
+ if (toRemove == null)
+ toRemove = new TreeSet<>(result.metadata().comparator);
+ toRemove.add(clustering);
+ }
+
+ if (!removeStatic && toRemove == null)
+ return filter;
+
+ // Check if we have everything we need
+ boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
+ boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
+ if (hasNoMoreStatic && hasNoMoreClusterings)
+ return null;
+
+ if (toRemove != null)
+ {
+ BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
+ newClusterings.addAll(Sets.difference(clusterings, toRemove));
+ clusterings = newClusterings.build();
+ }
+ return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+ }
+
+ private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
+ {
+ // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
+ // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
+ // and 2) the requested columns.
+ if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+ return false;
+
+ for (ColumnDefinition column : requestedColumns)
+ {
+ Cell cell = row.getCell(column);
+ if (cell == null || cell.timestamp() <= sstableTimestamp)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)",
+ metadata().ksName,
+ metadata().cfName,
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ metadata().getKeyValidator().getString(partitionKey().getKey()),
+ clusteringIndexFilter.toString(metadata()),
+ nowInSec());
+ }
+
+ public MessageOut<ReadCommand> createMessage(int version)
+ {
+ return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
+ }
+
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ sb.append(" WHERE ");
+
+ sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = ");
+ DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey());
+
+ // We put the row filter first because the clustering index filter can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ sb.append(" AND ").append(rowFilter());
+
+ String filterString = clusteringIndexFilter().toCQLString(metadata());
+ if (!filterString.isEmpty())
+ sb.append(" AND ").append(filterString);
+ }
+
+ protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+ {
+ metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
+ ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version);
+ }
+
+ protected long selectionSerializedSize(int version)
+ {
+ return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
+ + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
+ }
+
+ /**
+ * Groups multiple single partition read commands.
+ */
+ public static class Group implements ReadQuery
+ {
+ public final List<SinglePartitionReadCommand> commands;
+ private final DataLimits limits;
+ private final int nowInSec;
+
+ public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ {
+ assert !commands.isEmpty();
+ this.commands = commands;
+ this.limits = limits;
+ this.nowInSec = commands.get(0).nowInSec();
+ for (int i = 1; i < commands.size(); i++)
+ assert commands.get(i).nowInSec() == nowInSec;
+ }
+
+ public static Group one(SinglePartitionReadCommand command)
+ {
+ return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(this, consistency, clientState);
+ }
+
+ public int nowInSec()
+ {
+ return nowInSec;
+ }
+
+ public DataLimits limits()
+ {
+ return limits;
+ }
+
+ public CFMetaData metadata()
+ {
+ return commands.get(0).metadata();
+ }
+
+ public ReadOrderGroup startOrderGroup()
+ {
+ // Note that the only difference between the command in a group must be the partition key on which
+ // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
+ return commands.get(0).startOrderGroup();
+ }
+
+ public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+ {
+ List<PartitionIterator> partitions = new ArrayList<>(commands.size());
+ for (SinglePartitionReadCommand cmd : commands)
+ partitions.add(cmd.executeInternal(orderGroup));
+
+ // Because we only have enforce the limit per command, we need to enforce it globally.
+ return limits.filter(PartitionIterators.concat(partitions), nowInSec);
+ }
+
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ if (commands.size() == 1)
+ return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
+
+ return new MultiPartitionPager(this, pagingState, protocolVersion);
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return Iterables.any(commands, c -> c.selectsKey(key));
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
+ }
+
+ @Override
+ public String toString()
+ {
+ return commands.toString();
+ }
+ }
+
+ private static class Deserializer extends SelectionDeserializer
+ {
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+ throws IOException
+ {
+ DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
+ ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
+ }
+ }
+}
[14/15] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.5
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc6b2b3d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc6b2b3d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc6b2b3d
Branch: refs/heads/trunk
Commit: fc6b2b3d4bc18406e9d93bd0bf0a0f4080cf93f2
Parents: caad327 dcc57d0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:49:20 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:49:20 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc6b2b3d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a71d4c6,87691f9..7acb24d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -91,9 -58,9 +91,10 @@@ Merged from 2.2
* (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
* Fix paging on DISTINCT queries repeats result when first row in partition changes
(CASSANDRA-10010)
+ * (cqlsh) Support timezone conversion using pytz (CASSANDRA-10397)
* cqlsh: change default encoding to UTF-8 (CASSANDRA-11124)
Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* Don't remove FailureDetector history on removeEndpoint (CASSANDRA-10371)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc6b2b3d/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc57d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc57d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc57d0b
Branch: refs/heads/trunk
Commit: dcc57d0bb2761f0b71f6064f4830af9fa140d0cf
Parents: ec0092b 5182376
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 16 10:48:30 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 16 10:48:30 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 126039a,dca4f8a..87691f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -58,12 -29,15 +58,13 @@@ Merged from 2.2
* (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
* Fix paging on DISTINCT queries repeats result when first row in partition changes
(CASSANDRA-10010)
+ * cqlsh: change default encoding to UTF-8 (CASSANDRA-11124)
Merged from 2.1:
+ * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
* Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
- * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
- * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
* Don't remove FailureDetector history on removeEndpoint (CASSANDRA-10371)
* Only notify if repair status changed (CASSANDRA-11172)
- * Add partition key to TombstoneOverwhelmingException error message (CASSANDRA-10888)
* Use logback setting for 'cassandra -v' command (CASSANDRA-10767)
* Fix sstableloader to unthrottle streaming by default (CASSANDRA-9714)
* Fix incorrect warning in 'nodetool status' (CASSANDRA-10176)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index de4c9c7,0000000..14923b9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -1,988 -1,0 +1,991 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+
+/**
+ * A read command that selects a (part of a) single partition.
+ */
+public class SinglePartitionReadCommand extends ReadCommand
+{
+ protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+ private final DecoratedKey partitionKey;
+ private final ClusteringIndexFilter clusteringIndexFilter;
+
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+ public SinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ assert partitionKey.getPartitioner() == metadata.partitioner;
+ this.partitionKey = partitionKey;
+ this.clusteringIndexFilter = clusteringIndexFilter;
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition for thrift.
+ *
+ * @param isForThrift whether the query is for thrift or not.
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param columnFilter the column filter to use for the query.
+ * @param filter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command. The returned command will use no row filter and have no limits.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
+ {
+ return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
+ {
+ return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
+ {
+ return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided single slice.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slice the slice of rows to query.
+ *
+ * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+ {
+ return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+ {
+ ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+ return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
+ {
+ return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+ }
+
+ public SinglePartitionReadCommand copy()
+ {
+ return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter()
+ {
+ return clusteringIndexFilter;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+ {
+ return clusteringIndexFilter;
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getReadRpcTimeout();
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ if (!this.partitionKey().equals(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!clusteringIndexFilter().selects(clustering))
+ return false;
+
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ /**
+ * Returns a new command suitable to paging from the last returned row.
+ *
+ * @param lastReturned the last row returned by the previous page. The newly created command
+ * will only query row that comes after this (in query order). This can be {@code null} if this
+ * is the first page.
+ * @param pageSize the size to use for the page to query.
+ *
+ * @return the newly create command.
+ */
+ public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize)
+ {
+ // We shouldn't have set digest yet when reaching that point
+ assert !isDigestQuery();
+ return create(isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits().forPaging(pageSize),
+ partitionKey(),
+ lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(Group.one(this), consistency, clientState);
+ }
+
+ public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ return getPager(this, pagingState, protocolVersion);
+ }
+
+ private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
+ {
+ return new SinglePartitionPager(command, pagingState, protocolVersion);
+ }
+
+ protected void recordLatency(TableMetrics metric, long latencyNanos)
+ {
+ metric.readLatency.addNano(latencyNanos);
+ }
+
+ @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
+ protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ {
+ UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+ ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
+ : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
+ return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
+ }
+
+ /**
+ * Fetch the rows requested if in cache; if not, read it from disk and cache it.
+ * <p>
+ * If the partition is cached, and the filter given is within its bounds, we return
+ * from cache, otherwise from disk.
+ * <p>
+ * If the partition is is not cached, we figure out what filter is "biggest", read
+ * that from disk, then filter the result and either cache that or return it.
+ */
+ private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ assert !cfs.isIndex(); // CASSANDRA-5732
+ assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
+
+ RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey());
+
+ // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
+ // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
+ // TODO: don't evict entire partitions on writes (#2864)
+ IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+ if (cached != null)
+ {
+ if (cached instanceof RowCacheSentinel)
+ {
+ // Some other read is trying to cache the value, just do a normal non-caching read
+ Tracing.trace("Row cache miss (race)");
+ cfs.metric.rowCacheMiss.inc();
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ CachedPartition cachedPartition = (CachedPartition)cached;
+ if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
+ {
+ cfs.metric.rowCacheHit.inc();
+ Tracing.trace("Row cache hit");
+ UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
+ cfs.metric.updateSSTableIterated(0);
+ return unfilteredRowIterator;
+ }
+
+ cfs.metric.rowCacheHitOutOfRange.inc();
+ Tracing.trace("Ignoring row cache as cached value could not satisfy query");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ cfs.metric.rowCacheMiss.inc();
+ Tracing.trace("Row cache miss");
+
+ boolean cacheFullPartitions = metadata().params.caching.cacheAllRows();
+
+ // To be able to cache what we read, what we read must at least covers what the cache holds, that
+ // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows
+ // systematically, but we'd have to "extend" that to whatever is needed for the user query that the
+ // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently
+ // we settle for caching what we read only if the user query does query the head of the partition since
+ // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache
+ // full partitions, in which case we just always read it all and cache.
+ if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
+ {
+ RowCacheSentinel sentinel = new RowCacheSentinel();
+ boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
+ boolean sentinelReplaced = false;
+
+ try
+ {
+ int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+ @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
+ UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+ try
+ {
+ // We want to cache only rowsToCache rows
+ CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec());
+ if (sentinelSuccess && !toCache.isEmpty())
+ {
+ Tracing.trace("Caching {} rows", toCache.rowCount());
+ CacheService.instance.rowCache.replace(key, sentinel, toCache);
+ // Whether or not the previous replace has worked, our sentinel is not in the cache anymore
+ sentinelReplaced = true;
+ }
+
+ // We then re-filter out what this query wants.
+ // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more
+ // than what we've cached, so we can't just use toCache.
+ UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
+ if (cacheFullPartitions)
+ {
+ // Everything is guaranteed to be in 'toCache', we're done with 'iter'
+ assert !iter.hasNext();
+ iter.close();
+ return cacheIterator;
+ }
+ return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
+ }
+ catch (RuntimeException | Error e)
+ {
+ iter.close();
+ throw e;
+ }
+ }
+ finally
+ {
+ if (sentinelSuccess && !sentinelReplaced)
+ cfs.invalidateCachedPartition(key);
+ }
+ }
+
+ Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ /**
+ * Queries both memtable and sstables to fetch the result of this query.
+ * <p>
+ * Please note that this method:
+ * 1) does not check the row cache.
+ * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes).
+ * Those are applied in {@link ReadCommand#executeLocally}.
+ * 3) does not record some of the read metrics (latency, scanned cells histograms) nor
+ * throws TombstoneOverwhelmingException.
+ * It is publicly exposed because there is a few places where that is exactly what we want,
+ * but it should be used only where you know you don't need thoses things.
+ * <p>
+ * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is
+ * to enforce that that it is required as parameter, even though it's not explicitlly used by the method.
+ */
+ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ Tracing.trace("Executing single-partition query on {}", cfs.name);
+
+ boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
+ return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+ }
+
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
+ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+ {
+ /*
+ * We have 2 main strategies:
+ * 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
+ * unless we have a names filter that we know we can optimize futher.
+ * 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
+ * will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
+ * have a way to guarantee we have all the data for what is queried, which is only possible for name queries
+ * and if we have neither collections nor counters (indeed, for a collection, we can't guarantee an older sstable
+ * won't have some elements that weren't in the most recent sstables, and counters are intrinsically a collection
+ * of shards so have the same problem).
+ */
+ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
+ return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ ClusteringIndexFilter filter = clusteringIndexFilter();
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+ @SuppressWarnings("resource") // same as above
+ UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+ }
+ /*
+ * We can't eliminate full sstables based on the timestamp of what we've already read like
+ * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+ * we've read. We still rely on the sstable ordering by maxTimestamp since if
+ * maxTimestamp_s1 > maxTimestamp_s0,
+ * we're guaranteed that s1 cannot have a row tombstone such that
+ * timestamp(tombstone) > maxTimestamp_s0
+ * since we necessarily have
+ * timestamp(tombstone) <= maxTimestamp_s1
+ * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+ * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+ */
+ int sstablesIterated = 0;
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ List<SSTableReader> skippedSSTables = null;
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ long minTimestamp = Long.MAX_VALUE;
+ int nonIntersectingSSTables = 0;
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we can skip it
+ if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+ {
+ if (skippedSSTables == null)
+ skippedSSTables = new ArrayList<>();
+ skippedSSTables.add(sstable);
+ }
+ continue;
+ }
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+ sstablesIterated++;
+ }
+
+ int includedDueToTombstones = 0;
+ // Check for partition tombstones in the skipped sstables
+ if (skippedSSTables != null)
+ {
+ for (SSTableReader sstable : skippedSSTables)
+ {
+ if (sstable.getMaxTimestamp() <= minTimestamp)
+ continue;
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+ {
+ iterators.add(iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ includedDueToTombstones++;
+ sstablesIterated++;
+ }
+ else
+ {
+ iter.close();
+ }
+ }
+ }
+ if (Tracing.isTracing())
+ Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (iterators.isEmpty())
+ return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
+
+ Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
+
+ @SuppressWarnings("resource") // Closed through the closing of the result of that method.
+ UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+ if (!merged.isEmpty())
+ {
+ DecoratedKey key = merged.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+ }
+
+ return merged;
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+
+ private boolean shouldInclude(SSTableReader sstable)
+ {
+ // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable
+ // don't tell us if the sstable contains static values in particular.
+ // TODO: we could record if a sstable contains any static value at all.
+ if (!columnFilter().fetchedColumns().statics.isEmpty())
+ return true;
+
+ return clusteringIndexFilter().shouldInclude(sstable);
+ }
+
+ private boolean queryNeitherCountersNorCollections()
+ {
+ for (ColumnDefinition column : columnFilter().fetchedColumns())
+ {
+ if (column.type.isCollection() || column.type.isCounter())
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable
+ * max timestamp.
+ *
+ * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing
+ * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if
+ * no collection or counters are included).
+ * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
+ */
+ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+ {
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ ImmutableBTreePartition result = null;
+
+ Tracing.trace("Merging memtable contents");
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
+ {
+ if (iter.isEmpty())
+ continue;
+
+ UnfilteredRowIterator clonedFilter = copyOnHeap
+ ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
+ : iter;
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+ }
+ }
+
+ /* add the SSTables on disk */
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ int sstablesIterated = 0;
-
++ boolean onlyUnrepaired = true;
+ // read sorted sstables
+ for (SSTableReader sstable : view.sstables)
+ {
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we're done, since the rest of the sstables
+ // will also be older
+ if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
+ break;
+
+ long currentMaxTs = sstable.getMaxTimestamp();
+ filter = reduceFilter(filter, result, currentMaxTs);
+ if (filter == null)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion
+ // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
+ // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
+ // has any tombstone at all as a shortcut.
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE)
+ continue; // Means no tombstone at all, we can skip that sstable
+
+ // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
+ sstable.incrementReadCount();
+ try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))
+ {
+ if (iter.partitionLevelDeletion().isLive())
+ {
+ sstablesIterated++;
+ result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired());
+ }
+ }
+ continue;
+ }
+
+ Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
+ sstable.incrementReadCount();
+ try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())))
+ {
+ if (iter.isEmpty())
+ continue;
+
++ if (sstable.isRepaired())
++ onlyUnrepaired = false;
+ sstablesIterated++;
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired());
+ }
+ }
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (result == null || result.isEmpty())
+ return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
+
+ DecoratedKey key = result.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+
+ // "hoist up" the requested data into a more recent sstable
+ if (sstablesIterated > cfs.getMinimumCompactionThreshold()
++ && onlyUnrepaired
+ && !cfs.isAutoCompactionDisabled()
+ && cfs.getCompactionStrategyManager().shouldDefragment())
+ {
+ // !!WARNING!! if we stop copying our data to a heap-managed object,
+ // we will need to track the lifetime of this mutation as well
+ Tracing.trace("Defragmenting requested data");
+
+ try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
+ {
+ final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
+ StageManager.getStage(Stage.MUTATION).execute(new Runnable()
+ {
+ public void run()
+ {
+ // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
+ }
+ });
+ }
+ }
+
+ return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
+ }
+
+ private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
+ {
+ if (!isRepaired)
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
+
+ int maxRows = Math.max(filter.requestedRows().size(), 1);
+ if (result == null)
+ return ImmutableBTreePartition.create(iter, maxRows);
+
+ try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec()))
+ {
+ return ImmutableBTreePartition.create(merged, maxRows);
+ }
+ }
+
+ private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
+ {
+ if (result == null)
+ return filter;
+
+ SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
+
+ PartitionColumns columns = columnFilter().fetchedColumns();
+ NavigableSet<Clustering> clusterings = filter.requestedRows();
+
+ // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
+ // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
+ // that for later.
+
+ boolean removeStatic = false;
+ if (!columns.statics.isEmpty())
+ {
+ Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+ removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
+ }
+
+ NavigableSet<Clustering> toRemove = null;
+ for (Clustering clustering : clusterings)
+ {
+ if (!searchIter.hasNext())
+ break;
+
+ Row row = searchIter.next(clustering);
+ if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
+ continue;
+
+ if (toRemove == null)
+ toRemove = new TreeSet<>(result.metadata().comparator);
+ toRemove.add(clustering);
+ }
+
+ if (!removeStatic && toRemove == null)
+ return filter;
+
+ // Check if we have everything we need
+ boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
+ boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
+ if (hasNoMoreStatic && hasNoMoreClusterings)
+ return null;
+
+ if (toRemove != null)
+ {
+ BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
+ newClusterings.addAll(Sets.difference(clusterings, toRemove));
+ clusterings = newClusterings.build();
+ }
+ return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+ }
+
+ private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
+ {
+ // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
+ // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
+ // and 2) the requested columns.
+ if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+ return false;
+
+ for (ColumnDefinition column : requestedColumns)
+ {
+ Cell cell = row.getCell(column);
+ if (cell == null || cell.timestamp() <= sstableTimestamp)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)",
+ metadata().ksName,
+ metadata().cfName,
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ metadata().getKeyValidator().getString(partitionKey().getKey()),
+ clusteringIndexFilter.toString(metadata()),
+ nowInSec());
+ }
+
+ public MessageOut<ReadCommand> createMessage(int version)
+ {
+ return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
+ }
+
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ sb.append(" WHERE ");
+
+ sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = ");
+ DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey());
+
+ // We put the row filter first because the clustering index filter can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ sb.append(" AND ").append(rowFilter());
+
+ String filterString = clusteringIndexFilter().toCQLString(metadata());
+ if (!filterString.isEmpty())
+ sb.append(" AND ").append(filterString);
+ }
+
+ protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+ {
+ metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
+ ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version);
+ }
+
+ protected long selectionSerializedSize(int version)
+ {
+ return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
+ + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
+ }
+
+ /**
+ * Groups multiple single partition read commands.
+ */
+ public static class Group implements ReadQuery
+ {
+ public final List<SinglePartitionReadCommand> commands;
+ private final DataLimits limits;
+ private final int nowInSec;
+
+ public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ {
+ assert !commands.isEmpty();
+ this.commands = commands;
+ this.limits = limits;
+ this.nowInSec = commands.get(0).nowInSec();
+ for (int i = 1; i < commands.size(); i++)
+ assert commands.get(i).nowInSec() == nowInSec;
+ }
+
+ public static Group one(SinglePartitionReadCommand command)
+ {
+ return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(this, consistency, clientState);
+ }
+
+ public int nowInSec()
+ {
+ return nowInSec;
+ }
+
+ public DataLimits limits()
+ {
+ return limits;
+ }
+
+ public CFMetaData metadata()
+ {
+ return commands.get(0).metadata();
+ }
+
+ public ReadOrderGroup startOrderGroup()
+ {
+ // Note that the only difference between the command in a group must be the partition key on which
+ // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
+ return commands.get(0).startOrderGroup();
+ }
+
+ public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+ {
+ List<PartitionIterator> partitions = new ArrayList<>(commands.size());
+ for (SinglePartitionReadCommand cmd : commands)
+ partitions.add(cmd.executeInternal(orderGroup));
+
+ // Because we only have enforce the limit per command, we need to enforce it globally.
+ return limits.filter(PartitionIterators.concat(partitions), nowInSec);
+ }
+
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ if (commands.size() == 1)
+ return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
+
+ return new MultiPartitionPager(this, pagingState, protocolVersion);
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return Iterables.any(commands, c -> c.selectsKey(key));
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
+ }
+
+ @Override
+ public String toString()
+ {
+ return commands.toString();
+ }
+ }
+
+ private static class Deserializer extends SelectionDeserializer
+ {
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+ throws IOException
+ {
+ DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
+ ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
+ }
+ }
+}