You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/05/18 15:46:19 UTC
[1/4] cassandra git commit: Implement virtual keyspace interface
Repository: cassandra
Updated Branches:
refs/heads/trunk 3b6c93828 -> 0d464cd25
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
new file mode 100644
index 0000000..3581a73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.validation.entities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable;
+import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.triggers.ITrigger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class VirtualTableTest extends CQLTester
+{
+ private static final String KS_NAME = "test_virtual_ks";
+ private static final String VT1_NAME = "vt1";
+ private static final String VT2_NAME = "vt2";
+
+ private static class WritableVirtualTable extends AbstractVirtualTable
+ {
+ private final ColumnMetadata valueColumn;
+ private final Map<String, Integer> backingMap = new HashMap<>();
+
+ WritableVirtualTable(String keyspaceName, String tableName)
+ {
+ super(TableMetadata.builder(keyspaceName, tableName)
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn("key", UTF8Type.instance)
+ .addRegularColumn("value", Int32Type.instance)
+ .build());
+ valueColumn = metadata().regularColumns().getSimple(0);
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet data = new SimpleDataSet(metadata());
+ backingMap.forEach((key, value) -> data.row(key).column("value", value));
+ return data;
+ }
+
+ @Override
+ public void apply(PartitionUpdate update)
+ {
+ String key = (String) metadata().partitionKeyType.compose(update.partitionKey().getKey());
+ update.forEach(row ->
+ {
+ Integer value = Int32Type.instance.compose(row.getCell(valueColumn).value());
+ backingMap.put(key, value);
+ });
+ }
+ }
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ TableMetadata vt1Metadata =
+ TableMetadata.builder(KS_NAME, VT1_NAME)
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn("pk", UTF8Type.instance)
+ .addClusteringColumn("c", UTF8Type.instance)
+ .addRegularColumn("v1", Int32Type.instance)
+ .addRegularColumn("v2", LongType.instance)
+ .build();
+
+ SimpleDataSet vt1data = new SimpleDataSet(vt1Metadata);
+
+ vt1data.row("pk1", "c1").column("v1", 11).column("v2", 11L)
+ .row("pk2", "c1").column("v1", 21).column("v2", 21L)
+ .row("pk1", "c2").column("v1", 12).column("v2", 12L)
+ .row("pk2", "c2").column("v1", 22).column("v2", 22L)
+ .row("pk1", "c3").column("v1", 13).column("v2", 13L)
+ .row("pk2", "c3").column("v1", 23).column("v2", 23L);
+
+ VirtualTable vt1 = new AbstractVirtualTable(vt1Metadata)
+ {
+ public DataSet data()
+ {
+ return vt1data;
+ }
+ };
+ VirtualTable vt2 = new WritableVirtualTable(KS_NAME, VT2_NAME);
+
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2)));
+
+ CQLTester.setUpClass();
+ }
+
+ @Test
+ public void testQueries() throws Throwable
+ {
+ assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'UNKNOWN'"));
+
+ assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'UNKNOWN'"));
+
+ // Test DISTINCT query
+ assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1"),
+ row("pk1"),
+ row("pk2"));
+
+ assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1 WHERE token(pk) > token('pk1')"),
+ row("pk2"));
+
+ // Test single partition queries
+ assertRowsNet(executeNet("SELECT v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'c1'"),
+ row(11, 11L));
+
+ assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c1', 'c2')"),
+ row("c1", 11, 11L),
+ row("c2", 12, 12L));
+
+ assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c2', 'c1') ORDER BY c DESC"),
+ row("c2", 12, 12L),
+ row("c1", 11, 11L));
+
+ // Test multi-partition queries
+ assertRows(execute("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"),
+ row("pk1", "c1", 11, 11L),
+ row("pk1", "c2", 12, 12L),
+ row("pk2", "c1", 21, 21L),
+ row("pk2", "c2", 22, 22L));
+
+ assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC"),
+ row("pk1", "c2", 12),
+ row("pk2", "c2", 22),
+ row("pk1", "c1", 11),
+ row("pk2", "c1", 21));
+
+ assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC LIMIT 1"),
+ row("pk1", "c2", 12));
+
+ assertRows(execute("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1' , 'c3') ORDER BY c DESC PER PARTITION LIMIT 1"),
+ row("c3", 13, 13L),
+ row("c3", 23, 23L));
+
+ assertRows(execute("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"),
+ row(4L));
+
+ for (int pageSize = 1; pageSize < 5; pageSize++)
+ {
+ assertRowsNet(executeNetWithPaging("SELECT pk, c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize),
+ row("pk1", "c1", 11, 11L),
+ row("pk1", "c2", 12, 12L),
+ row("pk2", "c1", 21, 21L),
+ row("pk2", "c2", 22, 22L));
+
+ assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') LIMIT 2", pageSize),
+ row("pk1", "c1", 11, 11L),
+ row("pk1", "c2", 12, 12L));
+
+ assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize),
+ row(4L));
+ }
+
+ // Test range queries
+ for (int pageSize = 1; pageSize < 4; pageSize++)
+ {
+ assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') ALLOW FILTERING", pageSize),
+ row("pk1", "c1", 11, 11L),
+ row("pk1", "c2", 12, 12L));
+
+ assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') LIMIT 1 ALLOW FILTERING", pageSize),
+ row("pk1", "c1", 11, 11L));
+
+ assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) <= token('pk2') AND c > 'c1' PER PARTITION LIMIT 1 ALLOW FILTERING", pageSize),
+ row("pk1", "c2", 12, 12L),
+ row("pk2", "c2", 22, 22L));
+
+ assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE token(pk) = token('pk2') AND c < 'c3' ALLOW FILTERING", pageSize),
+ row(2L));
+ }
+ }
+
+ @Test
+ public void testModifications() throws Throwable
+ {
+ // check for clean state
+ assertRows(execute("SELECT * FROM test_virtual_ks.vt2"));
+
+ // fill the table, test UNLOGGED batch
+ execute("BEGIN UNLOGGED BATCH " +
+ "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
+ "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
+ "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+ "APPLY BATCH");
+ assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+ row("pk1", 1),
+ row("pk2", 2),
+ row("pk3", 3));
+
+ // test that LOGGED batches don't allow virtual table updates
+ assertInvalidMessage("Cannot include a virtual table statement in a logged batch",
+ "BEGIN BATCH " +
+ "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
+ "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
+ "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+ "APPLY BATCH");
+
+ // test that UNLOGGED batch doesn't allow mixing updates for regular and virtual tables
+ createTable("CREATE TABLE %s (key text PRIMARY KEY, value int)");
+ assertInvalidMessage("Mutations for virtual and regular tables cannot exist in the same batch",
+ "BEGIN UNLOGGED BATCH " +
+ "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1'" +
+ "UPDATE %s SET value = 2 WHERE key ='pk2'" +
+ "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3'" +
+ "APPLY BATCH");
+
+ // update a single value with UPDATE
+ execute("UPDATE test_virtual_ks.vt2 SET value = 11 WHERE key ='pk1'");
+ assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk1'"),
+ row("pk1", 11));
+
+ // update a single value with INSERT
+ executeNet("INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22)");
+ assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk2'"),
+ row("pk2", 22));
+
+ // test that deletions are (currently) rejected
+ assertInvalidMessage("Virtual tables don't support DELETE statements",
+ "DELETE FROM test_virtual_ks.vt2 WHERE key ='pk1'");
+
+ // test that TTL is (currently) rejected with INSERT and UPDATE
+ assertInvalidMessage("Expiring columns are not supported by virtual tables",
+ "INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk1', 11) USING TTL 86400");
+ assertInvalidMessage("Expiring columns are not supported by virtual tables",
+ "UPDATE test_virtual_ks.vt2 USING TTL 86400 SET value = 11 WHERE key ='pk1'");
+
+ // test that LWT is (currently) rejected with virtual tables in batches
+ assertInvalidMessage("Conditional BATCH statements cannot include mutations for virtual tables",
+ "BEGIN UNLOGGED BATCH " +
+ "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2;" +
+ "APPLY BATCH");
+
+ // test that LWT is (currently) rejected with virtual tables in UPDATEs
+ assertInvalidMessage("Conditional updates are not supported by virtual tables",
+ "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2");
+
+ // test that LWT is (currently) rejected with virtual tables in INSERTs
+ assertInvalidMessage("Conditional updates are not supported by virtual tables",
+ "INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22) IF NOT EXISTS");
+ }
+
+ @Test
+ public void testInvalidDDLOperations() throws Throwable
+ {
+ assertInvalidMessage("Cannot drop virtual keyspaces",
+ "DROP KEYSPACE test_virtual_ks");
+
+ assertInvalidMessage("Cannot alter virtual keyspaces",
+ "ALTER KEYSPACE test_virtual_ks WITH durable_writes = false");
+
+ assertInvalidMessage("Cannot create tables in virtual keyspaces",
+ "CREATE TABLE test_virtual_ks.test (id int PRIMARY KEY)");
+
+ assertInvalidMessage("Cannot create types in virtual keyspaces",
+ "CREATE TYPE test_virtual_ks.type (id int)");
+
+ assertInvalidMessage("Cannot drop virtual tables",
+ "DROP TABLE test_virtual_ks.vt1");
+
+ assertInvalidMessage("Cannot alter virtual tables",
+ "ALTER TABLE test_virtual_ks.vt1 DROP v1");
+
+ assertInvalidMessage("Cannot truncate virtual tables",
+ "TRUNCATE TABLE test_virtual_ks.vt1");
+
+ assertInvalidMessage("Secondary indexes are not supported on virtual tables",
+ "CREATE INDEX ON test_virtual_ks.vt1 (v1)");
+
+ assertInvalidMessage("Materialized views are not supported on virtual tables",
+ "CREATE MATERIALIZED VIEW test_virtual_ks.mvt1 AS SELECT c, v1 FROM test_virtual_ks.vt1 WHERE c IS NOT NULL PRIMARY KEY(c)");
+
+ assertInvalidMessage("Cannot CREATE TRIGGER against a virtual table",
+ "CREATE TRIGGER test_trigger ON test_virtual_ks.vt1 USING '" + TestTrigger.class.getName() + '\'');
+ }
+
+ /**
+ * Noop trigger for audit log testing
+ */
+ public static class TestTrigger implements ITrigger
+ {
+ public Collection<Mutation> augment(Partition update)
+ {
+ return null;
+ }
+ }
+
+ @Test
+ public void testMBeansMethods() throws Throwable
+ {
+ StorageServiceMBean mbean = StorageService.instance;
+
+ assertJMXFails(() -> mbean.forceKeyspaceCompaction(false, KS_NAME));
+ assertJMXFails(() -> mbean.forceKeyspaceCompaction(false, KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.scrub(true, true, true, true, 1, KS_NAME));
+ assertJMXFails(() -> mbean.scrub(true, true, true, true, 1, KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.verify(true, KS_NAME));
+ assertJMXFails(() -> mbean.verify(true, KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.upgradeSSTables(KS_NAME, false, 1));
+ assertJMXFails(() -> mbean.upgradeSSTables(KS_NAME, false, 1, VT1_NAME));
+
+ assertJMXFails(() -> mbean.garbageCollect("ROW", 1, KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.forceKeyspaceFlush(KS_NAME));
+ assertJMXFails(() -> mbean.forceKeyspaceFlush(KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.truncate(KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.loadNewSSTables(KS_NAME, VT1_NAME));
+
+ assertJMXFails(() -> mbean.getAutoCompactionStatus(KS_NAME));
+ assertJMXFails(() -> mbean.getAutoCompactionStatus(KS_NAME, VT1_NAME));
+ }
+
+ @FunctionalInterface
+ private static interface ThrowingRunnable
+ {
+ public void run() throws Throwable;
+ }
+
+ private void assertJMXFails(ThrowingRunnable r) throws Throwable
+ {
+ try
+ {
+ r.run();
+ fail();
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertEquals("Cannot perform any operations against virtual keyspace " + KS_NAME, e.getMessage());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/4] cassandra git commit: Implement virtual keyspace interface
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
new file mode 100644
index 0000000..560fbe9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.service.pager.PartitionRangeQueryPager;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * A {@code ReadQuery} for a range of partitions.
+ */
+public interface PartitionRangeReadQuery extends ReadQuery
+{
+ static ReadQuery create(TableMetadata table,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
+ {
+ if (table.isVirtual())
+ return VirtualTablePartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
+
+ return PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
+ }
+
+ DataRange dataRange();
+
+ /**
+ * Creates a new {@code PartitionRangeReadQuery} with the updated limits.
+ *
+ * @param newLimits the new limits
+ * @return the new {@code PartitionRangeReadQuery}
+ */
+ PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits);
+
+ /**
+ * Creates a new {@code PartitionRangeReadQuery} with the updated limits and data range.
+ *
+ * @param newLimits the new limits
+ * @return the new {@code PartitionRangeReadQuery}
+ */
+ PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange);
+
+ default QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
+ {
+ return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
+ }
+
+ default boolean selectsKey(DecoratedKey key)
+ {
+ if (!dataRange().contains(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
+ }
+
+ default boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!dataRange().clusteringIndexFilter(key).selects(clustering))
+ return false;
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ default boolean selectsFullPartition()
+ {
+ return metadata().isStaticCompactTable() ||
+ (dataRange().selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 128f8f3..064dd77 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.function.LongPredicate;
-import java.util.function.Predicate;
import javax.annotation.Nullable;
@@ -29,7 +28,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.monitoring.ApproximateTime;
-import org.apache.cassandra.db.monitoring.MonitorableImpl;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.StoppingTransformation;
@@ -37,6 +35,7 @@ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.UnknownIndexException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -57,19 +56,13 @@ import org.apache.cassandra.utils.FBUtilities;
* <p>
* This contains all the informations needed to do a local read.
*/
-public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
+public abstract class ReadCommand extends AbstractReadQuery
{
private static final int TEST_ITERATION_DELAY_MILLIS = Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", "0"));
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
private final Kind kind;
- private final TableMetadata metadata;
- private final int nowInSec;
-
- private final ColumnFilter columnFilter;
- private final RowFilter rowFilter;
- private final DataLimits limits;
private final boolean isDigestQuery;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
@@ -115,14 +108,10 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
DataLimits limits,
IndexMetadata index)
{
+ super(metadata, nowInSec, columnFilter, rowFilter, limits);
this.kind = kind;
this.isDigestQuery = isDigestQuery;
this.digestVersion = digestVersion;
- this.metadata = metadata;
- this.nowInSec = nowInSec;
- this.columnFilter = columnFilter;
- this.rowFilter = rowFilter;
- this.limits = limits;
this.index = index;
}
@@ -140,30 +129,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
/**
- * The metadata for the table queried.
- *
- * @return the metadata for the table queried.
- */
- public TableMetadata metadata()
- {
- return metadata;
- }
-
- /**
- * The time in seconds to use as "now" for this query.
- * <p>
- * We use the same time as "now" for the whole query to avoid considering different
- * values as expired during the query, which would be buggy (would throw of counting amongst other
- * things).
- *
- * @return the time (in seconds) to use as "now".
- */
- public int nowInSec()
- {
- return nowInSec;
- }
-
- /**
* The configured timeout for this command.
*
* @return the configured timeout for this command.
@@ -171,43 +136,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
public abstract long getTimeout();
/**
- * A filter on which (non-PK) columns must be returned by the query.
- *
- * @return which columns must be fetched by this query.
- */
- public ColumnFilter columnFilter()
- {
- return columnFilter;
- }
-
- /**
- * Filters/Resrictions on CQL rows.
- * <p>
- * This contains the restrictions that are not directly handled by the
- * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column
- * restrictions and can include some PK columns restrictions when those can't be
- * satisfied entirely by the clustering index filter (because not all clustering columns
- * have been restricted for instance). If there is 2ndary indexes on the table,
- * one of this restriction might be handled by a 2ndary index.
- *
- * @return the filter holding the expression that rows must satisfy.
- */
- public RowFilter rowFilter()
- {
- return rowFilter;
- }
-
- /**
- * The limits set on this query.
- *
- * @return the limits set on this query.
- */
- public DataLimits limits()
- {
- return limits;
- }
-
- /**
* Whether this query is a digest one or not.
*
* @return Whether this query is a digest query.
@@ -327,9 +255,8 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
*/
public void maybeValidateIndex()
{
- Index index = getIndex(Keyspace.openAndGetStore(metadata));
if (null != index)
- index.validate(this);
+ IndexRegistry.obtain(metadata()).getIndex(index).validate(this);
}
/**
@@ -388,11 +315,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
- public PartitionIterator executeInternal(ReadExecutionController controller)
- {
- return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
- }
-
public ReadExecutionController executionController()
{
return ReadExecutionController.forCommand(this);
@@ -410,7 +332,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace);
- private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness();
+ private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();
private int liveRows = 0;
private int tombstones = 0;
@@ -553,7 +475,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
private void maybeDelayForTesting()
{
- if (!metadata.keyspace.startsWith("system"))
+ if (!metadata().keyspace.startsWith("system"))
FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
}
}
@@ -605,7 +527,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
{
StringBuilder sb = new StringBuilder();
sb.append("SELECT ").append(columnFilter());
- sb.append(" FROM ").append(metadata().keyspace).append('.').append(metadata.name);
+ sb.append(" FROM ").append(metadata().keyspace).append('.').append(metadata().name);
appendCQLWhereClause(sb);
if (limits() != DataLimits.NONE)
@@ -657,11 +579,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata()));
if (command.isDigestQuery())
out.writeUnsignedVInt(command.digestVersion());
- command.metadata.id.serialize(out);
+ command.metadata().id.serialize(out);
out.writeInt(command.nowInSec());
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
RowFilter.serializer.serialize(command.rowFilter(), out, version);
- DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator);
+ DataLimits.serializer.serialize(command.limits(), out, version, command.metadata().comparator);
if (null != command.index)
IndexMetadata.serializer.serialize(command.index, out, version);
@@ -714,11 +636,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
{
return 2 // kind + flags
+ (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
- + command.metadata.id.serializedSize()
+ + command.metadata().id.serializedSize()
+ TypeSizes.sizeof(command.nowInSec())
+ ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ RowFilter.serializer.serializedSize(command.rowFilter(), version)
- + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata.comparator)
+ + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata().comparator)
+ command.selectionSerializedSize(version)
+ command.indexSerializedSize(version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index d527d28..fd94aa1 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -17,74 +17,113 @@
*/
package org.apache.cassandra.db;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.FBUtilities;
/**
* Generic abstraction for read queries.
- * <p>
- * The main implementation of this is {@link ReadCommand} but we have this interface because
- * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a
- * {@code ReadCommand}.
*/
public interface ReadQuery
{
- ReadQuery EMPTY = new ReadQuery()
+ public static ReadQuery empty(final TableMetadata metadata)
{
- public ReadExecutionController executionController()
+ return new ReadQuery()
{
- return ReadExecutionController.empty();
- }
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
- public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
- {
- return EmptyIterators.partition();
- }
+ public ReadExecutionController executionController()
+ {
+ return ReadExecutionController.empty();
+ }
- public PartitionIterator executeInternal(ReadExecutionController controller)
- {
- return EmptyIterators.partition();
- }
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+ {
+ return EmptyIterators.partition();
+ }
- public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
- {
- return EmptyIterators.unfilteredPartition(executionController.metadata());
- }
+ public PartitionIterator executeInternal(ReadExecutionController controller)
+ {
+ return EmptyIterators.partition();
+ }
- public DataLimits limits()
- {
- // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means
- // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging"
- // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this.
- return DataLimits.cqlLimits(0);
- }
+ public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+ {
+ return EmptyIterators.unfilteredPartition(executionController.metadata());
+ }
- public QueryPager getPager(PagingState state, ProtocolVersion protocolVersion)
- {
- return QueryPager.EMPTY;
- }
+ public DataLimits limits()
+ {
+ // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means
+ // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging"
+ // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this.
+ return DataLimits.cqlLimits(0);
+ }
- public boolean selectsKey(DecoratedKey key)
- {
- return false;
- }
+ public QueryPager getPager(PagingState state, ProtocolVersion protocolVersion)
+ {
+ return QueryPager.EMPTY;
+ }
- public boolean selectsClustering(DecoratedKey key, Clustering clustering)
- {
- return false;
- }
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return false;
+ }
- @Override
- public boolean selectsFullPartition()
- {
- return false;
- }
- };
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return false;
+ }
+
+ @Override
+ public int nowInSec()
+ {
+ return FBUtilities.nowInSeconds();
+ }
+
+ @Override
+ public boolean selectsFullPartition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return true;
+ }
+
+ @Override
+ public RowFilter rowFilter()
+ {
+ return RowFilter.NONE;
+ }
+
+ @Override
+ public ColumnFilter columnFilter()
+ {
+ return ColumnFilter.NONE;
+ }
+ };
+ }
+
+ /**
+ * The metadata for the table this is a query on.
+ *
+ * @return the metadata for the table this is a query on.
+ */
+ public TableMetadata metadata();
/**
* Starts a new read operation.
@@ -156,8 +195,63 @@ public interface ReadQuery
public boolean selectsClustering(DecoratedKey key, Clustering clustering);
/**
+ * The time in seconds to use as "now" for this query.
+ * <p>
+ * We use the same time as "now" for the whole query to avoid considering different
+ * values as expired during the query, which would be buggy (would throw of counting amongst other
+ * things).
+ *
+ * @return the time (in seconds) to use as "now".
+ */
+ public int nowInSec();
+
+ /**
* Checks if this {@code ReadQuery} selects full partitions, that is it has no filtering on clustering or regular columns.
* @return {@code true} if this {@code ReadQuery} selects full partitions, {@code false} otherwise.
*/
public boolean selectsFullPartition();
+
+ /**
+ * Filters/Resrictions on CQL rows.
+ * <p>
+ * This contains the restrictions that are not directly handled by the
+ * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column
+ * restrictions and can include some PK columns restrictions when those can't be
+ * satisfied entirely by the clustering index filter (because not all clustering columns
+ * have been restricted for instance). If there is 2ndary indexes on the table,
+ * one of this restriction might be handled by a 2ndary index.
+ *
+ * @return the filter holding the expression that rows must satisfy.
+ */
+ public RowFilter rowFilter();
+
+ /**
+ * A filter on which (non-PK) columns must be returned by the query.
+ *
+ * @return which columns must be fetched by this query.
+ */
+ public ColumnFilter columnFilter();
+
+ /**
+ * Whether this query is known to return nothing upfront.
+ * <p>
+ * This is overridden by the {@code ReadQuery} created through {@link #empty(TableMetadata)}, and that's probably the
+ * only place that should override it.
+ *
+ * @return if this method is guaranteed to return no results whatsoever.
+ */
+ public default boolean isEmpty()
+ {
+ return false;
+ }
+
+ /**
+ * If the index manager for the table determines that there's an applicable
+ * 2i that can be used to execute this query, call its (optional)
+ * validation method to check that nothing in this query's parameters
+ * violates the implementation specific validation rules.
+ */
+ default void maybeValidateIndex()
+ {
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 64a25f8..7214106 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -20,14 +20,11 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.cache.RowCacheKey;
import org.apache.cassandra.cache.RowCacheSentinel;
@@ -53,9 +50,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTreeSet;
@@ -63,7 +58,7 @@ import org.apache.cassandra.utils.btree.BTreeSet;
/**
* A read command that selects a (part of a) single partition.
*/
-public class SinglePartitionReadCommand extends ReadCommand
+public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery
{
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
@@ -314,6 +309,7 @@ public class SinglePartitionReadCommand extends ReadCommand
indexMetadata());
}
+ @Override
public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
{
return new SinglePartitionReadCommand(isDigestQuery(),
@@ -328,11 +324,13 @@ public class SinglePartitionReadCommand extends ReadCommand
indexMetadata());
}
+ @Override
public DecoratedKey partitionKey()
{
return partitionKey;
}
+ @Override
public ClusteringIndexFilter clusteringIndexFilter()
{
return clusteringIndexFilter;
@@ -348,35 +346,7 @@ public class SinglePartitionReadCommand extends ReadCommand
return DatabaseDescriptor.getReadRpcTimeout();
}
- public boolean selectsKey(DecoratedKey key)
- {
- if (!this.partitionKey().equals(key))
- return false;
-
- return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
- }
-
- public boolean selectsClustering(DecoratedKey key, Clustering clustering)
- {
- if (clustering == Clustering.STATIC_CLUSTERING)
- return !columnFilter().fetchedColumns().statics.isEmpty();
-
- if (!clusteringIndexFilter().selects(clustering))
- return false;
-
- return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
- }
-
- /**
- * Returns a new command suitable to paging from the last returned row.
- *
- * @param lastReturned the last row returned by the previous page. The newly created command
- * will only query row that comes after this (in query order). This can be {@code null} if this
- * is the first page.
- * @param limits the limits to use for the page to query.
- *
- * @return the newly create command.
- */
+ @Override
public SinglePartitionReadCommand forPaging(Clustering lastReturned, DataLimits limits)
{
// We shouldn't have set digest yet when reaching that point
@@ -395,16 +365,6 @@ public class SinglePartitionReadCommand extends ReadCommand
return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime);
}
- public SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
- {
- return getPager(this, pagingState, protocolVersion);
- }
-
- private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, ProtocolVersion protocolVersion)
- {
- return new SinglePartitionPager(command, pagingState, protocolVersion);
- }
-
protected void recordLatency(TableMetrics metric, long latencyNanos)
{
metric.readLatency.addNano(latencyNanos);
@@ -1054,23 +1014,34 @@ public class SinglePartitionReadCommand extends ReadCommand
/**
* Groups multiple single partition read commands.
*/
- public static class Group implements ReadQuery
+ public static class Group extends SinglePartitionReadQuery.Group<SinglePartitionReadCommand>
{
- public final List<SinglePartitionReadCommand> commands;
- private final DataLimits limits;
- private final int nowInSec;
- private final boolean selectsFullPartitions;
+ public static Group create(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ List<DecoratedKey> partitionKeys,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ List<SinglePartitionReadCommand> commands = new ArrayList<>(partitionKeys.size());
+ for (DecoratedKey partitionKey : partitionKeys)
+ {
+ commands.add(SinglePartitionReadCommand.create(metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter));
+ }
+
+ return new Group(commands, limits);
+ }
public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
{
- assert !commands.isEmpty();
- this.commands = commands;
- this.limits = limits;
- SinglePartitionReadCommand firstCommand = commands.get(0);
- this.nowInSec = firstCommand.nowInSec();
- this.selectsFullPartitions = firstCommand.selectsFullPartition();
- for (int i = 1; i < commands.size(); i++)
- assert commands.get(i).nowInSec() == nowInSec;
+ super(commands, limits);
}
public static Group one(SinglePartitionReadCommand command)
@@ -1082,97 +1053,6 @@ public class SinglePartitionReadCommand extends ReadCommand
{
return StorageProxy.read(this, consistency, clientState, queryStartNanoTime);
}
-
- public int nowInSec()
- {
- return nowInSec;
- }
-
- public DataLimits limits()
- {
- return limits;
- }
-
- public TableMetadata metadata()
- {
- return commands.get(0).metadata();
- }
-
- @Override
- public boolean selectsFullPartition()
- {
- return selectsFullPartitions;
- }
-
- public ReadExecutionController executionController()
- {
- // Note that the only difference between the command in a group must be the partition key on which
- // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
- return commands.get(0).executionController();
- }
-
- public PartitionIterator executeInternal(ReadExecutionController controller)
- {
- // Note that the only difference between the command in a group must be the partition key on which
- // they applied.
- boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness();
- return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec),
- nowInSec,
- selectsFullPartitions,
- enforceStrictLiveness);
- }
-
- public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
- {
- return executeLocally(executionController, true);
- }
-
- /**
- * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}.
- *
- * @param executionController - the {@code ReadExecutionController} protecting the read.
- * @param sort - whether to sort the inner commands by partition key, required for merging the iterator
- * later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)}
- * because in this case it is safe to do so as there is no merging involved and we don't want to
- * change the old behavior which was to not sort by partition.
- *
- * @return - the iterator that can be used to retrieve the query result.
- */
- private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort)
- {
- List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(commands.size());
- for (SinglePartitionReadCommand cmd : commands)
- partitions.add(Pair.of(cmd.partitionKey, cmd.executeLocally(executionController)));
-
- if (sort)
- Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft()));
-
- return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList()));
- }
-
- public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
- {
- if (commands.size() == 1)
- return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
-
- return new MultiPartitionPager(this, pagingState, protocolVersion);
- }
-
- public boolean selectsKey(DecoratedKey key)
- {
- return Iterables.any(commands, c -> c.selectsKey(key));
- }
-
- public boolean selectsClustering(DecoratedKey key, Clustering clustering)
- {
- return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
- }
-
- @Override
- public String toString()
- {
- return commands.toString();
- }
}
private static class Deserializer extends SelectionDeserializer
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
new file mode 100644
index 0000000..f9f0014
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.pager.MultiPartitionPager;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.SinglePartitionPager;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * A {@code ReadQuery} for a single partition.
+ */
+public interface SinglePartitionReadQuery extends ReadQuery
+{
+ public static Group<? extends SinglePartitionReadQuery> createGroup(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ List<DecoratedKey> partitionKeys,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return metadata.isVirtual()
+ ? VirtualTableSinglePartitionReadQuery.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter)
+ : SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
+ }
+
+
+ /**
+ * Creates a new read query on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param columnFilter the column filter to use for the query.
+ * @param filter the clustering index filter to use for the query.
+ *
+ * @return a newly created read query. The returned query will use no row filter and have no limits.
+ */
+ public static SinglePartitionReadQuery create(TableMetadata metadata,
+ int nowInSec,
+ DecoratedKey key,
+ ColumnFilter columnFilter,
+ ClusteringIndexFilter filter)
+ {
+ return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new read query on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read query.
+ */
+ public static SinglePartitionReadQuery create(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return metadata.isVirtual()
+ ? VirtualTableSinglePartitionReadQuery.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter)
+ : SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Returns the key of the partition queried by this {@code ReadQuery}
+ * @return the key of the partition queried
+ */
+ DecoratedKey partitionKey();
+
+ /**
+ * Creates a new {@code SinglePartitionReadQuery} with the specified limits.
+ *
+ * @param newLimits the new limits
+ * @return the new {@code SinglePartitionReadQuery}
+ */
+ SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits);
+
+ /**
+ * Returns a new {@code SinglePartitionReadQuery} suitable to paging from the last returned row.
+ *
+ * @param lastReturned the last row returned by the previous page. The newly created query
+ * will only query row that comes after this (in query order). This can be {@code null} if this
+ * is the first page.
+ * @param limits the limits to use for the page to query.
+ *
+ * @return the newly create query.
+ */
+ SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits limits);
+
+ @Override
+ default SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
+ {
+ return new SinglePartitionPager(this, pagingState, protocolVersion);
+ }
+
+ ClusteringIndexFilter clusteringIndexFilter();
+
+ default boolean selectsKey(DecoratedKey key)
+ {
+ if (!this.partitionKey().equals(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
+ }
+
+ default boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!clusteringIndexFilter().selects(clustering))
+ return false;
+
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ /**
+ * Groups multiple single partition read queries.
+ */
+ abstract class Group<T extends SinglePartitionReadQuery> implements ReadQuery
+ {
+ public final List<T> queries;
+ private final DataLimits limits;
+ private final int nowInSec;
+ private final boolean selectsFullPartitions;
+
+ public Group(List<T> queries, DataLimits limits)
+ {
+ assert !queries.isEmpty();
+ this.queries = queries;
+ this.limits = limits;
+ T firstQuery = queries.get(0);
+ this.nowInSec = firstQuery.nowInSec();
+ this.selectsFullPartitions = firstQuery.selectsFullPartition();
+ for (int i = 1; i < queries.size(); i++)
+ assert queries.get(i).nowInSec() == nowInSec;
+ }
+
+ public int nowInSec()
+ {
+ return nowInSec;
+ }
+
+ public DataLimits limits()
+ {
+ return limits;
+ }
+
+ public TableMetadata metadata()
+ {
+ return queries.get(0).metadata();
+ }
+
+ @Override
+ public boolean selectsFullPartition()
+ {
+ return selectsFullPartitions;
+ }
+
+ public ReadExecutionController executionController()
+ {
+ // Note that the only difference between the queries in a group must be the partition key on which
+ // they applied. So as far as ReadOrderGroup is concerned, we can use any of the queries to start one.
+ return queries.get(0).executionController();
+ }
+
+ public PartitionIterator executeInternal(ReadExecutionController controller)
+ {
+ // Note that the only difference between the queries in a group must be the partition key on which
+ // they applied.
+ boolean enforceStrictLiveness = queries.get(0).metadata().enforceStrictLiveness();
+ return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec),
+ nowInSec,
+ selectsFullPartitions,
+ enforceStrictLiveness);
+ }
+
+ public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+ {
+ return executeLocally(executionController, true);
+ }
+
+ /**
+ * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}.
+ *
+ * @param executionController - the {@code ReadExecutionController} protecting the read.
+ * @param sort - whether to sort the inner queries by partition key, required for merging the iterator
+ * later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)}
+ * because in this case it is safe to do so as there is no merging involved and we don't want to
+ * change the old behavior which was to not sort by partition.
+ *
+ * @return - the iterator that can be used to retrieve the query result.
+ */
+ private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort)
+ {
+ List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(queries.size());
+ for (T query : queries)
+ partitions.add(Pair.of(query.partitionKey(), query.executeLocally(executionController)));
+
+ if (sort)
+ Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft()));
+
+ return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList()));
+ }
+
+ public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
+ {
+ if (queries.size() == 1)
+ return new SinglePartitionPager(queries.get(0), pagingState, protocolVersion);
+
+ return new MultiPartitionPager<T>(this, pagingState, protocolVersion);
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return Iterables.any(queries, c -> c.selectsKey(key));
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return Iterables.any(queries, c -> c.selectsClustering(key, clustering));
+ }
+
+ @Override
+ public RowFilter rowFilter()
+ {
+ // Note that the only difference between the query in a group must be the partition key on which
+ // they applied.
+ return queries.get(0).rowFilter();
+ }
+
+ @Override
+ public ColumnFilter columnFilter()
+ {
+ // Note that the only difference between the query in a group must be the partition key on which
+ // they applied.
+ return queries.get(0).columnFilter();
+ }
+
+ @Override
+ public String toString()
+ {
+ return queries.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
new file mode 100644
index 0000000..48cafa1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * A read query that selects a (part of a) range of partitions of a virtual table.
+ */
+public class VirtualTablePartitionRangeReadQuery extends VirtualTableReadQuery implements PartitionRangeReadQuery
+{
+ private final DataRange dataRange;
+
+ public static VirtualTablePartitionRangeReadQuery create(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
+ {
+ return new VirtualTablePartitionRangeReadQuery(metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ dataRange);
+ }
+
+ private VirtualTablePartitionRangeReadQuery(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
+ {
+ super(metadata, nowInSec, columnFilter, rowFilter, limits);
+ this.dataRange = dataRange;
+ }
+
+ @Override
+ public DataRange dataRange()
+ {
+ return dataRange;
+ }
+
+ @Override
+ public PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits)
+ {
+ return new VirtualTablePartitionRangeReadQuery(metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ dataRange());
+ }
+
+ @Override
+ public PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
+ {
+ return new VirtualTablePartitionRangeReadQuery(metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ newDataRange);
+ }
+
+ @Override
+ protected UnfilteredPartitionIterator queryVirtualTable()
+ {
+ VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+ return view.select(dataRange, columnFilter());
+ }
+
+ @Override
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ if (dataRange.isUnrestricted() && rowFilter().isEmpty())
+ return;
+
+ sb.append(" WHERE ");
+ // We put the row filter first because the data range can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ {
+ sb.append(rowFilter());
+ if (!dataRange.isUnrestricted())
+ sb.append(" AND ");
+ }
+ if (!dataRange.isUnrestricted())
+ sb.append(dataRange.toCQLString(metadata()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
new file mode 100644
index 0000000..f2c9a49
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * Base class for the {@code ReadQuery} implementations use to query virtual tables.
+ */
+public abstract class VirtualTableReadQuery extends AbstractReadQuery
+{
+ protected VirtualTableReadQuery(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits)
+ {
+ super(metadata, nowInSec, columnFilter, rowFilter, limits);
+ }
+
+ @Override
+ public ReadExecutionController executionController()
+ {
+ return ReadExecutionController.empty();
+ }
+
+ @Override
+ public PartitionIterator execute(ConsistencyLevel consistency,
+ ClientState clientState,
+ long queryStartNanoTime) throws RequestExecutionException
+ {
+ return executeInternal(executionController());
+ }
+
+ @Override
+ public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+ {
+ UnfilteredPartitionIterator resultIterator = queryVirtualTable();
+ return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+ }
+
+ protected abstract UnfilteredPartitionIterator queryVirtualTable();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
new file mode 100644
index 0000000..11f1f77
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A read query that selects a (part of a) single partition of a virtual table.
+ */
+public class VirtualTableSinglePartitionReadQuery extends VirtualTableReadQuery implements SinglePartitionReadQuery
+{
+ private final DecoratedKey partitionKey;
+ private final ClusteringIndexFilter clusteringIndexFilter;
+
+ public static VirtualTableSinglePartitionReadQuery create(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return new VirtualTableSinglePartitionReadQuery(metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter);
+ }
+
+ private VirtualTableSinglePartitionReadQuery(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ super(metadata, nowInSec, columnFilter, rowFilter, limits);
+ this.partitionKey = partitionKey;
+ this.clusteringIndexFilter = clusteringIndexFilter;
+ }
+
+ @Override
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ sb.append(" WHERE ");
+
+ sb.append(ColumnMetadata.toCQLString(metadata().partitionKeyColumns())).append(" = ");
+ DataRange.appendKeyString(sb, metadata().partitionKeyType, partitionKey().getKey());
+
+ // We put the row filter first because the clustering index filter can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ sb.append(" AND ").append(rowFilter());
+
+ String filterString = clusteringIndexFilter().toCQLString(metadata());
+ if (!filterString.isEmpty())
+ sb.append(" AND ").append(filterString);
+ }
+
+ @Override
+ public ClusteringIndexFilter clusteringIndexFilter()
+ {
+ return clusteringIndexFilter;
+ }
+
+ @Override
+ public boolean selectsFullPartition()
+ {
+ return clusteringIndexFilter.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns();
+ }
+
+ @Override
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ @Override
+ public SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits)
+ {
+ return new VirtualTableSinglePartitionReadQuery(metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ partitionKey(),
+ clusteringIndexFilter);
+ }
+
+ @Override
+ public SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits limits)
+ {
+ return new VirtualTableSinglePartitionReadQuery(metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits,
+ partitionKey(),
+ lastReturned == null ? clusteringIndexFilter
+ : clusteringIndexFilter.forPaging(metadata().comparator,
+ lastReturned,
+ false));
+ }
+
+ @Override
+ protected UnfilteredPartitionIterator queryVirtualTable()
+ {
+ VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+ return view.select(partitionKey, clusteringIndexFilter, columnFilter());
+ }
+
+ /**
+ * Groups multiple single partition read queries.
+ */
+ public static class Group extends SinglePartitionReadQuery.Group<VirtualTableSinglePartitionReadQuery>
+ {
+ public static Group create(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ List<DecoratedKey> partitionKeys,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ List<VirtualTableSinglePartitionReadQuery> queries = new ArrayList<>(partitionKeys.size());
+ for (DecoratedKey partitionKey : partitionKeys)
+ {
+ queries.add(VirtualTableSinglePartitionReadQuery.create(metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter));
+ }
+
+ return new Group(queries, limits);
+ }
+
+ public Group(List<VirtualTableSinglePartitionReadQuery> queries, DataLimits limits)
+ {
+ super(queries, limits);
+ }
+
+ public static Group one(VirtualTableSinglePartitionReadQuery query)
+ {
+ return new Group(Collections.singletonList(query), query.limits());
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+ {
+ if (queries.size() == 1)
+ return queries.get(0).execute(consistency, clientState, queryStartNanoTime);
+
+ return PartitionIterators.concat(queries.stream()
+ .map(q -> q.execute(consistency, clientState, queryStartNanoTime))
+ .collect(Collectors.toList()));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 1d7d1c8..20a1656 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -65,6 +65,8 @@ import org.apache.cassandra.schema.TableMetadata;
*/
public class ColumnFilter
{
+ public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);
+
public static final Serializer serializer = new Serializer();
// True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 70759cf..9064b0f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.rows.*;
public abstract class PartitionIterators
@@ -32,15 +32,15 @@ public abstract class PartitionIterators
private PartitionIterators() {}
@SuppressWarnings("resource") // The created resources are returned right away
- public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand command)
+ public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadQuery query)
{
// If the query has no results, we'll get an empty iterator, but we still
// want a RowIterator out of this method, so we return an empty one.
RowIterator toReturn = iter.hasNext()
? iter.next()
- : EmptyIterators.row(command.metadata(),
- command.partitionKey(),
- command.clusteringIndexFilter().isReversed());
+ : EmptyIterators.row(query.metadata(),
+ query.partitionKey(),
+ query.clusteringIndexFilter().isReversed());
// Note that in general, we should wrap the result so that it's close method actually
// close the whole PartitionIterator.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index a549458..2dc566a 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
@@ -427,7 +428,7 @@ public class PartitionUpdate extends AbstractBTreePartition
public void validateIndexedColumns()
{
- Keyspace.openAndGetStore(metadata()).indexManager.validate(this);
+ IndexRegistry.obtain(metadata()).validate(this);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
new file mode 100644
index 0000000..a776d01
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on demand.
+ */
+public abstract class AbstractVirtualTable implements VirtualTable
+{
+ private final TableMetadata metadata;
+
+ protected AbstractVirtualTable(TableMetadata metadata)
+ {
+ if (!metadata.isVirtual())
+ throw new IllegalArgumentException();
+
+ this.metadata = metadata;
+ }
+
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
+
+ /**
+ * Provide a {@link DataSet} that is contains all of the virtual table's data.
+ */
+ public abstract DataSet data();
+
+ /**
+ * Provide a {@link DataSet} that is potentially restricted to the provided partition - but is allowed to contain
+ * other partitions.
+ */
+ public DataSet data(DecoratedKey partitionKey)
+ {
+ return data();
+ }
+
+ @Override
+ public final UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter)
+ {
+ Partition partition = data(partitionKey).getPartition(partitionKey);
+
+ if (null == partition)
+ return EmptyIterators.unfilteredPartition(metadata);
+
+ long now = System.currentTimeMillis();
+ UnfilteredRowIterator rowIterator = partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now);
+ return new SingletonUnfilteredPartitionIterator(rowIterator);
+ }
+
+ @Override
+ public final UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter)
+ {
+ DataSet data = data();
+
+ if (data.isEmpty())
+ return EmptyIterators.unfilteredPartition(metadata);
+
+ Iterator<Partition> iterator = data.getPartitions(dataRange);
+
+ long now = System.currentTimeMillis();
+
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ @Override
+ public UnfilteredRowIterator next()
+ {
+ Partition partition = iterator.next();
+ return partition.toRowIterator(metadata, dataRange.clusteringIndexFilter(partition.key()), columnFilter, now);
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
+ };
+ }
+
+ @Override
+ public void apply(PartitionUpdate update)
+ {
+ throw new InvalidRequestException("Modification is not supported by table " + metadata);
+ }
+
+ public interface DataSet
+ {
+ boolean isEmpty();
+ Partition getPartition(DecoratedKey partitionKey);
+ Iterator<Partition> getPartitions(DataRange range);
+ }
+
+ public interface Partition
+ {
+ DecoratedKey key();
+ UnfilteredRowIterator toRowIterator(TableMetadata metadata, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, long now);
+ }
+
+ /**
+ * An abstract, map-backed DataSet implementation. Can be backed by any {@link NavigableMap}, then either maintained
+ * persistently, or built on demand and thrown away after use, depending on the implementing class.
+ */
+ public static abstract class AbstractDataSet implements DataSet
+ {
+ protected final NavigableMap<DecoratedKey, Partition> partitions;
+
+ protected AbstractDataSet(NavigableMap<DecoratedKey, Partition> partitions)
+ {
+ this.partitions = partitions;
+ }
+
+ public boolean isEmpty()
+ {
+ return partitions.isEmpty();
+ }
+
+ public Partition getPartition(DecoratedKey key)
+ {
+ return partitions.get(key);
+ }
+
+ public Iterator<Partition> getPartitions(DataRange dataRange)
+ {
+ AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
+ PartitionPosition startKey = keyRange.left;
+ PartitionPosition endKey = keyRange.right;
+
+ NavigableMap<DecoratedKey, Partition> selection = partitions;
+
+ if (startKey.isMinimum() && endKey.isMinimum())
+ return selection.values().iterator();
+
+ if (startKey.isMinimum() && endKey instanceof DecoratedKey)
+ return selection.headMap((DecoratedKey) endKey, keyRange.isEndInclusive()).values().iterator();
+
+ if (startKey instanceof DecoratedKey && endKey instanceof DecoratedKey)
+ {
+ return selection.subMap((DecoratedKey) startKey, keyRange.isStartInclusive(), (DecoratedKey) endKey, keyRange.isEndInclusive())
+ .values()
+ .iterator();
+ }
+
+ if (startKey instanceof DecoratedKey)
+ selection = selection.tailMap((DecoratedKey) startKey, keyRange.isStartInclusive());
+
+ if (endKey instanceof DecoratedKey)
+ selection = selection.headMap((DecoratedKey) endKey, keyRange.isEndInclusive());
+
+ // If we have reach this point it means that one of the PartitionPosition is a KeyBound and we have
+ // to use filtering for eliminating the unwanted partitions.
+ Iterator<Partition> iterator = selection.values().iterator();
+
+ return new AbstractIterator<Partition>()
+ {
+ private boolean encounteredPartitionsWithinRange;
+
+ @Override
+ protected Partition computeNext()
+ {
+ while (iterator.hasNext())
+ {
+ Partition partition = iterator.next();
+ if (dataRange.contains(partition.key()))
+ {
+ encounteredPartitionsWithinRange = true;
+ return partition;
+ }
+
+ // we encountered some partitions within the range, but the last one is outside of the range: we are done
+ if (encounteredPartitionsWithinRange)
+ return endOfData();
+ }
+
+ return endOfData();
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
new file mode 100644
index 0000000..2af3b6a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A DataSet implementation that is filled on demand and has an easy to use API for adding rows.
+ */
+public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet
+{
+ private final TableMetadata metadata;
+
+ private Row currentRow;
+
+ public SimpleDataSet(TableMetadata metadata)
+ {
+ super(new TreeMap<>(DecoratedKey.comparator));
+ this.metadata = metadata;
+ }
+
+ public SimpleDataSet row(Object... primaryKeyValues)
+ {
+ if (Iterables.size(metadata.primaryKeyColumns()) != primaryKeyValues.length)
+ throw new IllegalArgumentException();
+
+ Object[] partitionKeyValues = new Object[metadata.partitionKeyColumns().size()];
+ Object[] clusteringValues = new Object[metadata.clusteringColumns().size()];
+
+ System.arraycopy(primaryKeyValues, 0, partitionKeyValues, 0, partitionKeyValues.length);
+ System.arraycopy(primaryKeyValues, partitionKeyValues.length, clusteringValues, 0, clusteringValues.length);
+
+ DecoratedKey partitionKey = makeDecoratedKey(partitionKeyValues);
+ Clustering clustering = makeClustering(clusteringValues);
+
+ currentRow = new Row(metadata, clustering);
+ SimplePartition partition = (SimplePartition) partitions.computeIfAbsent(partitionKey, pk -> new SimplePartition(metadata, pk));
+ partition.add(currentRow);
+
+ return this;
+ }
+
+ public SimpleDataSet column(String columnName, Object value)
+ {
+ if (null == currentRow)
+ throw new IllegalStateException();
+ currentRow.add(columnName, value);
+ return this;
+ }
+
+ private DecoratedKey makeDecoratedKey(Object... partitionKeyValues)
+ {
+ ByteBuffer partitionKey = partitionKeyValues.length == 1
+ ? decompose(metadata.partitionKeyType, partitionKeyValues[0])
+ : ((CompositeType) metadata.partitionKeyType).decompose(partitionKeyValues);
+ return metadata.partitioner.decorateKey(partitionKey);
+ }
+
+ private Clustering makeClustering(Object... clusteringValues)
+ {
+ if (clusteringValues.length == 0)
+ return Clustering.EMPTY;
+
+ ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+ for (int i = 0; i < clusteringValues.length; i++)
+ clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ return Clustering.make(clusteringByteBuffers);
+ }
+
+ private static final class SimplePartition implements AbstractVirtualTable.Partition
+ {
+ private final DecoratedKey key;
+ private final NavigableMap<Clustering, Row> rows;
+
+ private SimplePartition(TableMetadata metadata, DecoratedKey key)
+ {
+ this.key = key;
+ this.rows = new TreeMap<>(metadata.comparator);
+ }
+
+ private void add(Row row)
+ {
+ rows.put(row.clustering, row);
+ }
+
+ public DecoratedKey key()
+ {
+ return key;
+ }
+
+ public UnfilteredRowIterator toRowIterator(TableMetadata metadata,
+ ClusteringIndexFilter clusteringIndexFilter,
+ ColumnFilter columnFilter,
+ long now)
+ {
+ Iterator<Row> iterator = (clusteringIndexFilter.isReversed() ? rows.descendingMap() : rows).values().iterator();
+
+ return new AbstractUnfilteredRowIterator(metadata,
+ key,
+ DeletionTime.LIVE,
+ columnFilter.queriedColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ false,
+ EncodingStats.NO_STATS)
+ {
+ protected Unfiltered computeNext()
+ {
+ while (iterator.hasNext())
+ {
+ Row row = iterator.next();
+ if (clusteringIndexFilter.selects(row.clustering))
+ return row.toTableRow(columns, now);
+ }
+ return endOfData();
+ }
+ };
+ }
+ }
+
+ private static class Row
+ {
+ private final TableMetadata metadata;
+ private final Clustering clustering;
+
+ private final Map<ColumnMetadata, Object> values = new HashMap<>();
+
+ private Row(TableMetadata metadata, Clustering clustering)
+ {
+ this.metadata = metadata;
+ this.clustering = clustering;
+ }
+
+ private void add(String columnName, Object value)
+ {
+ ColumnMetadata column = metadata.getColumn(ByteBufferUtil.bytes(columnName));
+ if (null == column || !column.isRegular())
+ throw new IllegalArgumentException();
+ values.put(column, value);
+ }
+
+ private org.apache.cassandra.db.rows.Row toTableRow(RegularAndStaticColumns columns, long now)
+ {
+ org.apache.cassandra.db.rows.Row.Builder builder = BTreeRow.unsortedBuilder((int) TimeUnit.MILLISECONDS.toSeconds(now));
+ builder.newRow(clustering);
+
+ columns.forEach(c ->
+ {
+ Object value = values.get(c);
+ if (null != value)
+ builder.addCell(BufferCell.live(c, now, decompose(c.type, value)));
+ });
+
+ return builder.build();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+ {
+ return ((AbstractType<T>) type).decompose(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
new file mode 100644
index 0000000..8d6f59b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import com.google.common.collect.ImmutableList;
+
+public final class SystemViewsKeyspace extends VirtualKeyspace
+{
+ private static final String NAME = "system_views";
+
+ public static SystemViewsKeyspace instance = new SystemViewsKeyspace();
+
+ private SystemViewsKeyspace()
+ {
+ super(NAME, ImmutableList.of());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/4] cassandra git commit: Implement virtual keyspace interface
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
new file mode 100644
index 0000000..6750215
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collection;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Tables;
+
+public class VirtualKeyspace
+{
+ private final String name;
+ private final KeyspaceMetadata metadata;
+
+ private final ImmutableCollection<VirtualTable> tables;
+
+ public VirtualKeyspace(String name, Collection<VirtualTable> tables)
+ {
+ this.name = name;
+ this.tables = ImmutableList.copyOf(tables);
+
+ metadata = KeyspaceMetadata.virtual(name, Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+ }
+
+ public String name()
+ {
+ return name;
+ }
+
+ public KeyspaceMetadata metadata()
+ {
+ return metadata;
+ }
+
+ public ImmutableCollection<VirtualTable> tables()
+ {
+ return tables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
new file mode 100644
index 0000000..5e0f90c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+
+public final class VirtualKeyspaceRegistry
+{
+ public static final VirtualKeyspaceRegistry instance = new VirtualKeyspaceRegistry();
+
+ private final Map<String, VirtualKeyspace> virtualKeyspaces = new ConcurrentHashMap<>();
+ private final Map<TableId, VirtualTable> virtualTables = new ConcurrentHashMap<>();
+
+ private VirtualKeyspaceRegistry()
+ {
+ }
+
+ public void register(VirtualKeyspace keyspace)
+ {
+ virtualKeyspaces.put(keyspace.name(), keyspace);
+ keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t));
+ }
+
+ @Nullable
+ public VirtualKeyspace getKeyspaceNullable(String name)
+ {
+ return virtualKeyspaces.get(name);
+ }
+
+ @Nullable
+ public VirtualTable getTableNullable(TableId id)
+ {
+ return virtualTables.get(id);
+ }
+
+ @Nullable
+ public KeyspaceMetadata getKeyspaceMetadataNullable(String name)
+ {
+ VirtualKeyspace keyspace = virtualKeyspaces.get(name);
+ return null != keyspace ? keyspace.metadata() : null;
+ }
+
+ @Nullable
+ public TableMetadata getTableMetadataNullable(TableId id)
+ {
+ VirtualTable table = virtualTables.get(id);
+ return null != table ? table.metadata() : null;
+ }
+
+ public Iterable<KeyspaceMetadata> virtualKeyspacesMetadata()
+ {
+ return Iterables.transform(virtualKeyspaces.values(), VirtualKeyspace::metadata);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
new file mode 100644
index 0000000..dc32c8c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collection;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * A specialised IMutation implementation for virtual keyspaces.
+ *
+ * Mainly overrides {@link #apply()} to go straight to {@link VirtualTable#apply(PartitionUpdate)} for every table involved.
+ */
+public final class VirtualMutation implements IMutation
+{
+ private final String keyspaceName;
+ private final DecoratedKey partitionKey;
+ private final ImmutableMap<TableId, PartitionUpdate> modifications;
+
+ public VirtualMutation(PartitionUpdate update)
+ {
+ this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update));
+ }
+
+ public VirtualMutation(String keyspaceName, DecoratedKey partitionKey, ImmutableMap<TableId, PartitionUpdate> modifications)
+ {
+ this.keyspaceName = keyspaceName;
+ this.partitionKey = partitionKey;
+ this.modifications = modifications;
+ }
+
+ @Override
+ public void apply()
+ {
+ modifications.forEach((id, update) -> VirtualKeyspaceRegistry.instance.getTableNullable(id).apply(update));
+ }
+
+ @Override
+ public String getKeyspaceName()
+ {
+ return keyspaceName;
+ }
+
+ @Override
+ public Collection<TableId> getTableIds()
+ {
+ return modifications.keySet();
+ }
+
+ @Override
+ public DecoratedKey key()
+ {
+ return partitionKey;
+ }
+
+ @Override
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+
+ @Override
+ public String toString(boolean shallow)
+ {
+ MoreObjects.ToStringHelper helper =
+ MoreObjects.toStringHelper(this)
+ .add("keyspace", keyspaceName)
+ .add("partition key", partitionKey);
+
+ if (shallow)
+ helper.add("tables", getTableIds());
+ else
+ helper.add("modifications", getPartitionUpdates());
+
+ return helper.toString();
+ }
+
+ @Override
+ public Collection<PartitionUpdate> getPartitionUpdates()
+ {
+ return modifications.values();
+ }
+
+ @Override
+ public void validateIndexedColumns()
+ {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
new file mode 100644
index 0000000..299cc00
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.schema.TableMetadata.builder;
+
+public final class VirtualSchemaKeyspace extends VirtualKeyspace
+{
+ private static final String NAME = "system_virtual_schema";
+
+ public static final VirtualSchemaKeyspace instance = new VirtualSchemaKeyspace();
+
+ private VirtualSchemaKeyspace()
+ {
+ super(NAME, ImmutableList.of(new VirtualKeyspaces(NAME), new VirtualTables(NAME), new VirtualColumns(NAME)));
+ }
+
+ private static final class VirtualKeyspaces extends AbstractVirtualTable
+ {
+ private static final String KEYSPACE_NAME = "keyspace_name";
+
+ private VirtualKeyspaces(String keyspace)
+ {
+ super(builder(keyspace, "keyspaces")
+ .comment("virtual keyspace definitions")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+ .build());
+ }
+
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+ for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+ result.row(keyspace.name);
+ return result;
+ }
+ }
+
+ private static final class VirtualTables extends AbstractVirtualTable
+ {
+ private static final String KEYSPACE_NAME = "keyspace_name";
+ private static final String TABLE_NAME = "table_name";
+ private static final String COMMENT = "comment";
+
+ private VirtualTables(String keyspace)
+ {
+ super(builder(keyspace, "tables")
+ .comment("virtual table definitions")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+ .addClusteringColumn(TABLE_NAME, UTF8Type.instance)
+ .addRegularColumn(COMMENT, UTF8Type.instance)
+ .build());
+ }
+
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+ {
+ for (TableMetadata table : keyspace.tables)
+ {
+ result.row(table.keyspace, table.name)
+ .column(COMMENT, table.params.comment);
+ }
+ }
+
+ return result;
+ }
+ }
+
+ private static final class VirtualColumns extends AbstractVirtualTable
+ {
+ private static final String KEYSPACE_NAME = "keyspace_name";
+ private static final String TABLE_NAME = "table_name";
+ private static final String COLUMN_NAME = "column_name";
+ private static final String CLUSTERING_ORDER = "clustering_order";
+ private static final String COLUMN_NAME_BYTES = "column_name_bytes";
+ private static final String KIND = "kind";
+ private static final String POSITION = "position";
+ private static final String TYPE = "type";
+
+ private VirtualColumns(String keyspace)
+ {
+ super(builder(keyspace, "columns")
+ .comment("virtual column definitions")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+ .addClusteringColumn(TABLE_NAME, UTF8Type.instance)
+ .addClusteringColumn(COLUMN_NAME, UTF8Type.instance)
+ .addRegularColumn(CLUSTERING_ORDER, UTF8Type.instance)
+ .addRegularColumn(COLUMN_NAME_BYTES, BytesType.instance)
+ .addRegularColumn(KIND, UTF8Type.instance)
+ .addRegularColumn(POSITION, Int32Type.instance)
+ .addRegularColumn(TYPE, UTF8Type.instance)
+ .build());
+ }
+
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+ {
+ for (TableMetadata table : keyspace.tables)
+ {
+ for (ColumnMetadata column : table.columns())
+ {
+ result.row(column.ksName, column.cfName, column.name.toString())
+ .column(CLUSTERING_ORDER, column.clusteringOrder().toString().toLowerCase())
+ .column(COLUMN_NAME_BYTES, column.name.bytes)
+ .column(KIND, column.kind.toString().toLowerCase())
+ .column(POSITION, column.position())
+ .column(TYPE, column.type.asCQL3Type().toString());
+ }
+ }
+ }
+
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
new file mode 100644
index 0000000..ea196ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * A system view used to expose system information.
+ */
+public interface VirtualTable
+{
+ /**
+ * Returns the view name.
+ *
+ * @return the view name.
+ */
+ default String name()
+ {
+ return metadata().name;
+ }
+
+ /**
+ * Returns the view metadata.
+ *
+ * @return the view metadata.
+ */
+ TableMetadata metadata();
+
+ /**
+ * Applies the specified update.
+ * @param update the update to apply
+ */
+ void apply(PartitionUpdate update);
+
+ /**
+ * Selects the rows from a single partition.
+ *
+ * @param partitionKey the partition key
+ * @param clusteringIndexFilter the clustering columns to selected
+ * @param columnFilter the selected columns
+ * @return the rows corresponding to the requested data.
+ */
+ UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter);
+
+ /**
+ * Selects the rows from a range of partitions.
+ *
+ * @param dataRange the range of data to retrieve
+ * @param columnFilter the selected columns
+ * @return the rows corresponding to the requested data.
+ */
+ UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/IndexRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 9f5ed02..e4c531b 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -21,8 +21,14 @@
package org.apache.cassandra.index;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.TableMetadata;
/**
* The collection of all Index instances for a base table.
@@ -34,9 +40,72 @@ import org.apache.cassandra.schema.IndexMetadata;
*/
public interface IndexRegistry
{
+ /**
+ * An empty {@code IndexRegistry}
+ */
+ public static final IndexRegistry EMPTY = new IndexRegistry()
+ {
+ @Override
+ public void unregisterIndex(Index index)
+ {
+ }
+
+ @Override
+ public void registerIndex(Index index)
+ {
+ }
+
+ @Override
+ public Collection<Index> listIndexes()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Index getIndex(IndexMetadata indexMetadata)
+ {
+ return null;
+ }
+
+ @Override
+ public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ public void validate(PartitionUpdate update)
+ {
+ }
+ };
+
void registerIndex(Index index);
void unregisterIndex(Index index);
Index getIndex(IndexMetadata indexMetadata);
Collection<Index> listIndexes();
+
+ Optional<Index> getBestIndexFor(RowFilter.Expression expression);
+
+ /**
+ * Called at write time to ensure that values present in the update
+ * are valid according to the rules of all registered indexes which
+ * will process it. The partition key as well as the clustering and
+ * cell values for each row in the update may be checked by index
+ * implementations
+ *
+ * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+ */
+ void validate(PartitionUpdate update);
+
+ /**
+ * Returns the {@code IndexRegistry} associated to the specified table.
+ *
+ * @param table the table metadata
+ * @return the {@code IndexRegistry} associated to the specified table
+ */
+ public static IndexRegistry obtain(TableMetadata table)
+ {
+ return table.isVirtual() ? EMPTY : Keyspace.openAndGetStore(table).indexManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 9a29c02..fb0d629 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -739,6 +739,7 @@ public abstract class CassandraIndex implements Index
TableMetadata.Builder builder =
TableMetadata.builder(baseCfsMetadata.keyspace, baseCfsMetadata.indexTableName(indexMetadata), baseCfsMetadata.id)
+ .kind(TableMetadata.Kind.INDEX)
// tables for legacy KEYS indexes are non-compound and dense
.isDense(indexMetadata.isKeys())
.isCompound(!indexMetadata.isKeys())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 80a3869..5a72d2c 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -36,16 +36,23 @@ import static java.lang.String.format;
*/
public final class KeyspaceMetadata
{
+ public enum Kind
+ {
+ REGULAR, VIRTUAL
+ }
+
public final String name;
+ public final Kind kind;
public final KeyspaceParams params;
public final Tables tables;
public final Views views;
public final Types types;
public final Functions functions;
- private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
+ private KeyspaceMetadata(String name, Kind kind, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
{
this.name = name;
+ this.kind = kind;
this.params = params;
this.tables = tables;
this.views = views;
@@ -55,42 +62,52 @@ public final class KeyspaceMetadata
public static KeyspaceMetadata create(String name, KeyspaceParams params)
{
- return new KeyspaceMetadata(name, params, Tables.none(), Views.none(), Types.none(), Functions.none());
+ return new KeyspaceMetadata(name, Kind.REGULAR, params, Tables.none(), Views.none(), Types.none(), Functions.none());
}
public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables)
{
- return new KeyspaceMetadata(name, params, tables, Views.none(), Types.none(), Functions.none());
+ return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, Views.none(), Types.none(), Functions.none());
}
public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, views, types, functions);
+ }
+
+ public static KeyspaceMetadata virtual(String name, Tables tables)
+ {
+ return new KeyspaceMetadata(name, Kind.VIRTUAL, KeyspaceParams.local(), tables, Views.none(), Types.none(), Functions.none());
}
public KeyspaceMetadata withSwapped(KeyspaceParams params)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Tables regular)
{
- return new KeyspaceMetadata(name, params, regular, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, regular, views, types, functions);
}
public KeyspaceMetadata withSwapped(Views views)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Types types)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Functions functions)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
+ }
+
+ public boolean isVirtual()
+ {
+ return kind == Kind.VIRTUAL;
}
public Iterable<TableMetadata> tablesAndViews()
@@ -129,7 +146,7 @@ public final class KeyspaceMetadata
@Override
public int hashCode()
{
- return Objects.hashCode(name, params, tables, views, functions, types);
+ return Objects.hashCode(name, kind, params, tables, views, functions, types);
}
@Override
@@ -144,6 +161,7 @@ public final class KeyspaceMetadata
KeyspaceMetadata other = (KeyspaceMetadata) o;
return name.equals(other.name)
+ && kind == other.kind
&& params.equals(other.params)
&& tables.equals(other.tables)
&& views.equals(other.views)
@@ -156,6 +174,7 @@ public final class KeyspaceMetadata
{
return MoreObjects.toStringHelper(this)
.add("name", name)
+ .add("kind", kind)
.add("params", params)
.add("tables", tables)
.add("views", views)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 594b2ab..09ec62a 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
@@ -28,15 +29,12 @@ import com.google.common.collect.Sets;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.KeyspaceNotDefinedException;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.UnknownTableException;
@@ -312,7 +310,8 @@ public final class Schema
public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
{
assert keyspaceName != null;
- return keyspaces.getNullable(keyspaceName);
+ KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+ return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName);
}
private Set<String> getNonSystemKeyspacesSet()
@@ -426,15 +425,17 @@ public final class Schema
assert keyspace != null;
assert table != null;
- KeyspaceMetadata ksm = keyspaces.getNullable(keyspace);
+ KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace);
return ksm == null
? null
: ksm.getTableOrViewNullable(table);
}
+ @Nullable
public TableMetadata getTableMetadata(TableId id)
{
- return keyspaces.getTableOrViewNullable(id);
+ TableMetadata table = keyspaces.getTableOrViewNullable(id);
+ return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
}
public TableMetadata validateTable(String keyspaceName, String tableName)
@@ -442,7 +443,7 @@ public final class Schema
if (tableName.isEmpty())
throw new InvalidRequestException("non-empty table is required");
- KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+ KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName);
if (keyspace == null)
throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 638e912..4945fc2 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -1141,7 +1141,7 @@ public final class SchemaKeyspace
TableMetadata metadata =
TableMetadata.builder(keyspaceName, viewName, TableId.fromUUID(row.getUUID("id")))
- .isView(true)
+ .kind(TableMetadata.Kind.VIEW)
.addColumns(columns)
.droppedColumns(fetchDroppedColumns(keyspaceName, viewName))
.params(createTableParamsFromRow(row))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 4634438..47e5b47 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.Objects;
+import javax.annotation.Nullable;
+
import com.google.common.base.MoreObjects;
import com.google.common.collect.*;
@@ -82,15 +84,21 @@ public final class TableMetadata
}
}
+ public enum Kind
+ {
+ REGULAR, INDEX, VIEW, VIRTUAL
+ }
+
public final String keyspace;
public final String name;
public final TableId id;
public final IPartitioner partitioner;
+ public final Kind kind;
public final TableParams params;
public final ImmutableSet<Flag> flags;
- private final boolean isView;
+ @Nullable
private final String indexName; // derived from table name
/*
@@ -139,12 +147,10 @@ public final class TableMetadata
id = builder.id;
partitioner = builder.partitioner;
+ kind = builder.kind;
params = builder.params.build();
- isView = builder.isView;
- indexName = name.contains(".")
- ? name.substring(name.indexOf('.') + 1)
- : null;
+ indexName = kind == Kind.INDEX ? name.substring(name.indexOf('.') + 1) : null;
droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
Collections.sort(builder.partitionKeyColumns);
@@ -184,23 +190,28 @@ public final class TableMetadata
{
return builder(keyspace, name, id)
.partitioner(partitioner)
+ .kind(kind)
.params(params)
.flags(flags)
- .isView(isView)
.addColumns(columns())
.droppedColumns(droppedColumns)
.indexes(indexes)
.triggers(triggers);
}
+ public boolean isIndex()
+ {
+ return kind == Kind.INDEX;
+ }
+
public boolean isView()
{
- return isView;
+ return kind == Kind.VIEW;
}
- public boolean isIndex()
+ public boolean isVirtual()
{
- return indexName != null;
+ return kind == Kind.VIRTUAL;
}
public Optional<String> indexName()
@@ -534,7 +545,7 @@ public final class TableMetadata
private void except(String format, Object... args)
{
- throw new ConfigurationException(keyspace + "." + name + ": " +format(format, args));
+ throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args));
}
@Override
@@ -552,9 +563,9 @@ public final class TableMetadata
&& name.equals(tm.name)
&& id.equals(tm.id)
&& partitioner.equals(tm.partitioner)
+ && kind == tm.kind
&& params.equals(tm.params)
&& flags.equals(tm.flags)
- && isView == tm.isView
&& columns.equals(tm.columns)
&& droppedColumns.equals(tm.droppedColumns)
&& indexes.equals(tm.indexes)
@@ -564,7 +575,7 @@ public final class TableMetadata
@Override
public int hashCode()
{
- return Objects.hash(keyspace, name, id, partitioner, params, flags, isView, columns, droppedColumns, indexes, triggers);
+ return Objects.hash(keyspace, name, id, partitioner, kind, params, flags, columns, droppedColumns, indexes, triggers);
}
@Override
@@ -580,9 +591,9 @@ public final class TableMetadata
.add("table", name)
.add("id", id)
.add("partitioner", partitioner)
+ .add("kind", kind)
.add("params", params)
.add("flags", flags)
- .add("isView", isView)
.add("columns", columns())
.add("droppedColumns", droppedColumns.values())
.add("indexes", indexes)
@@ -598,6 +609,7 @@ public final class TableMetadata
private TableId id;
private IPartitioner partitioner;
+ private Kind kind = Kind.REGULAR;
private TableParams.Builder params = TableParams.builder();
// Setting compound as default as "normal" CQL tables are compound and that's what we want by default
@@ -611,8 +623,6 @@ public final class TableMetadata
private final List<ColumnMetadata> clusteringColumns = new ArrayList<>();
private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>();
- private boolean isView;
-
private Builder(String keyspace, String name, TableId id)
{
this.keyspace = keyspace;
@@ -649,6 +659,12 @@ public final class TableMetadata
return this;
}
+ public Builder kind(Kind val)
+ {
+ kind = val;
+ return this;
+ }
+
public Builder params(TableParams val)
{
params = val.unbuild();
@@ -733,12 +749,6 @@ public final class TableMetadata
return this;
}
- public Builder isView(boolean val)
- {
- isView = val;
- return this;
- }
-
public Builder flags(Set<Flag> val)
{
flags = val;
@@ -979,6 +989,6 @@ public final class TableMetadata
*/
public boolean enforceStrictLiveness()
{
- return isView && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
+ return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CASRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 1db100d..88fb9bd 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.service;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -30,7 +30,7 @@ public interface CASRequest
/**
* The command to use to fetch the value to compare for the CAS.
*/
- public SinglePartitionReadCommand readCommand(int nowInSec);
+ public SinglePartitionReadQuery readCommand(int nowInSec);
/**
* Returns whether the provided CF, that represents the values fetched using the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 6e0b92b..815e673 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,6 +46,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.StartupClusterConnectivityChecker;
import org.apache.cassandra.schema.TableMetadata;
@@ -249,6 +252,8 @@ public class CassandraDaemon
throw e;
}
+ VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
+ VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
// clean up debris in the rest of the keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index c854737..234ac4f 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.*;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -65,8 +66,12 @@ public class ClientState
for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.LEGACY_PEERS, SystemKeyspace.PEERS_V2))
READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf));
+ // make all schema tables readable by default (required by the drivers)
SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
+ // make all virtual schema tables readable by default as well
+ VirtualSchemaKeyspace.instance.tables().forEach(t -> READABLE_SYSTEM_RESOURCES.add(t.metadata().resource));
+
// neither clients nor tools need authentication/authorization
if (DatabaseDescriptor.isDaemonInitialized())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 37bfd17..7e9b0f9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -259,7 +259,7 @@ public class StorageProxy implements StorageProxyMBean
// read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
- SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+ SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds());
ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
FilteredPartition current;
@@ -1633,7 +1633,7 @@ public class StorageProxy implements StorageProxyMBean
public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
- if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
+ if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries))
{
readMetrics.unavailables.mark();
readMetricsMap.get(consistencyLevel).unavailables.mark();
@@ -1649,11 +1649,11 @@ public class StorageProxy implements StorageProxyMBean
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
assert state != null;
- if (group.commands.size() > 1)
+ if (group.queries.size() > 1)
throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
long start = System.nanoTime();
- SinglePartitionReadCommand command = group.commands.get(0);
+ SinglePartitionReadCommand command = group.queries.get(0);
TableMetadata metadata = command.metadata();
DecoratedKey key = command.partitionKey();
@@ -1685,7 +1685,7 @@ public class StorageProxy implements StorageProxyMBean
throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint);
}
- result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime);
+ result = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime);
}
catch (UnavailableException e)
{
@@ -1727,13 +1727,13 @@ public class StorageProxy implements StorageProxyMBean
long start = System.nanoTime();
try
{
- PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
+ PartitionIterator result = fetchRows(group.queries, consistencyLevel, queryStartNanoTime);
// Note that the only difference between the command in a group must be the partition key on which
// they applied.
- boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness();
+ boolean enforceStrictLiveness = group.queries.get(0).metadata().enforceStrictLiveness();
// If we have more than one command, then despite each read command honoring the limit, the total result
// might not honor it and so we should enforce it
- if (group.commands.size() > 1)
+ if (group.queries.size() > 1)
result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness);
return result;
}
@@ -1761,7 +1761,7 @@ public class StorageProxy implements StorageProxyMBean
readMetrics.addNano(latency);
readMetricsMap.get(consistencyLevel).addNano(latency);
// TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329
- for (ReadCommand command : group.commands)
+ for (ReadCommand command : group.queries)
Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8570f10..4214644 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -69,6 +69,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token.TokenFactory;
@@ -3456,12 +3457,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
- private Keyspace getValidKeyspace(String keyspaceName) throws IOException
+ private void verifyKeyspaceIsValid(String keyspaceName)
{
+ if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspaceName))
+ throw new IllegalArgumentException("Cannot perform any operations against virtual keyspace " + keyspaceName);
+
if (!Schema.instance.getKeyspaces().contains(keyspaceName))
- {
- throw new IOException("Keyspace " + keyspaceName + " does not exist");
- }
+ throw new IllegalArgumentException("Keyspace " + keyspaceName + " does not exist");
+ }
+
+ private Keyspace getValidKeyspace(String keyspaceName)
+ {
+ verifyKeyspaceIsValid(keyspaceName);
return Keyspace.open(keyspaceName);
}
@@ -4787,6 +4794,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void truncate(String keyspace, String table) throws TimeoutException, IOException
{
+ verifyKeyspaceIsValid(keyspace);
+
try
{
StorageProxy.truncateBlocking(keyspace, table);
@@ -5249,6 +5258,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (!isInitialized())
throw new RuntimeException("Not yet initialized, can't load new sstables");
+ verifyKeyspaceIsValid(ksName);
ColumnFamilyStore.loadNewSSTables(ksName, cfName);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 8ebbdf7..da64a0c 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -26,9 +26,9 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.ProtocolVersion;
-abstract class AbstractQueryPager implements QueryPager
+abstract class AbstractQueryPager<T extends ReadQuery> implements QueryPager
{
- protected final ReadCommand command;
+ protected final T query;
protected final DataLimits limits;
protected final ProtocolVersion protocolVersion;
private final boolean enforceStrictLiveness;
@@ -43,12 +43,12 @@ abstract class AbstractQueryPager implements QueryPager
private boolean exhausted;
- protected AbstractQueryPager(ReadCommand command, ProtocolVersion protocolVersion)
+ protected AbstractQueryPager(T query, ProtocolVersion protocolVersion)
{
- this.command = command;
+ this.query = query;
this.protocolVersion = protocolVersion;
- this.limits = command.limits();
- this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+ this.limits = query.limits();
+ this.enforceStrictLiveness = query.metadata().enforceStrictLiveness();
this.remaining = limits.count();
this.remainingInPartition = limits.perPartitionCount();
@@ -56,7 +56,7 @@ abstract class AbstractQueryPager implements QueryPager
public ReadExecutionController executionController()
{
- return command.executionController();
+ return query.executionController();
}
public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime)
@@ -65,8 +65,8 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
+ Pager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec());
+ return Transformation.apply(nextPageReadQuery(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
}
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
@@ -75,8 +75,8 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager);
+ RowPager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec());
+ return Transformation.apply(nextPageReadQuery(pageSize).executeInternal(executionController), pager);
}
public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata metadata, int pageSize, ReadExecutionController executionController)
@@ -85,9 +85,9 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.unfilteredPartition(metadata);
pageSize = Math.min(pageSize, remaining);
- UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
+ UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), query.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(executionController), pager);
+ return Transformation.apply(nextPageReadQuery(pageSize).executeLocally(executionController), pager);
}
private class UnfilteredPager extends Pager<Unfiltered>
@@ -128,7 +128,7 @@ abstract class AbstractQueryPager implements QueryPager
private Pager(DataLimits pageLimits, int nowInSec)
{
- this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition(), enforceStrictLiveness);
+ this.counter = pageLimits.newCounter(nowInSec, true, query.selectsFullPartition(), enforceStrictLiveness);
this.pageLimits = pageLimits;
}
@@ -228,7 +228,7 @@ abstract class AbstractQueryPager implements QueryPager
return remainingInPartition;
}
- protected abstract ReadCommand nextPageReadCommand(int pageSize);
+ protected abstract T nextPageReadQuery(int pageSize);
protected abstract void recordLast(DecoratedKey key, Row row);
protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 9dae11c..ca16967 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -31,20 +31,20 @@ import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.service.ClientState;
/**
- * Pager over a list of ReadCommand.
+ * Pager over a list of SinglePartitionReadQuery.
*
- * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
- * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
- * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
- * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * Note that this is not easy to make efficient. Indeed, we need to page the first query fully before
+ * returning results from the next one, but if the result returned by each query is small (compared to pageSize),
+ * paging the queries one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each query raised pageSize results, we'll end up with queries.size() * pageSize results in memory, which
* defeats the purpose of paging.
*
- * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * For now, we keep it simple (somewhat) and just do one query at a time. Provided that we make sure to not
* create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
- * cfs meanPartitionSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * cfs meanPartitionSize to decide if parallelizing some of the query might be worth it while being confident we don't
* blow out memory.
*/
-public class MultiPartitionPager implements QueryPager
+public class MultiPartitionPager<T extends SinglePartitionReadQuery> implements QueryPager
{
private final SinglePartitionPager[] pagers;
private final DataLimits limit;
@@ -54,33 +54,33 @@ public class MultiPartitionPager implements QueryPager
private int remaining;
private int current;
- public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, ProtocolVersion protocolVersion)
+ public MultiPartitionPager(SinglePartitionReadQuery.Group<T> group, PagingState state, ProtocolVersion protocolVersion)
{
this.limit = group.limits();
this.nowInSec = group.nowInSec();
int i = 0;
- // If it's not the beginning (state != null), we need to find where we were and skip previous commands
+ // If it's not the beginning (state != null), we need to find where we were and skip previous queries
// since they are done.
if (state != null)
- for (; i < group.commands.size(); i++)
- if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey))
+ for (; i < group.queries.size(); i++)
+ if (group.queries.get(i).partitionKey().getKey().equals(state.partitionKey))
break;
- if (i >= group.commands.size())
+ if (i >= group.queries.size())
{
pagers = null;
return;
}
- pagers = new SinglePartitionPager[group.commands.size() - i];
+ pagers = new SinglePartitionPager[group.queries.size() - i];
// 'i' is on the first non exhausted pager for the previous page (or the first one)
- SinglePartitionReadCommand command = group.commands.get(i);
- pagers[0] = command.getPager(state, protocolVersion);
+ T query = group.queries.get(i);
+ pagers[0] = query.getPager(state, protocolVersion);
// Following ones haven't been started yet
- for (int j = i + 1; j < group.commands.size(); j++)
- pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion);
+ for (int j = i + 1; j < group.queries.size(); j++)
+ pagers[j - i] = group.queries.get(j).getPager(null, protocolVersion);
remaining = state == null ? limit.count() : state.remaining;
}
@@ -103,11 +103,11 @@ public class MultiPartitionPager implements QueryPager
SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length);
newPagers[current] = newPagers[current].withUpdatedLimit(newLimits);
- return new MultiPartitionPager(newPagers,
- newLimits,
- nowInSec,
- remaining,
- current);
+ return new MultiPartitionPager<T>(newPagers,
+ newLimits,
+ nowInSec,
+ remaining,
+ current);
}
public PagingState state()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index ba6862d..cebf3c6 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -21,37 +21,36 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.transport.ProtocolVersion;
/**
- * Pages a PartitionRangeReadCommand.
+ * Pages a PartitionRangeReadQuery.
*/
-public class PartitionRangeQueryPager extends AbstractQueryPager
+public class PartitionRangeQueryPager extends AbstractQueryPager<PartitionRangeReadQuery>
{
private volatile DecoratedKey lastReturnedKey;
private volatile PagingState.RowMark lastReturnedRow;
- public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, ProtocolVersion protocolVersion)
+ public PartitionRangeQueryPager(PartitionRangeReadQuery query, PagingState state, ProtocolVersion protocolVersion)
{
- super(command, protocolVersion);
+ super(query, protocolVersion);
if (state != null)
{
- lastReturnedKey = command.metadata().partitioner.decorateKey(state.partitionKey);
+ lastReturnedKey = query.metadata().partitioner.decorateKey(state.partitionKey);
lastReturnedRow = state.rowMark;
restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
}
- public PartitionRangeQueryPager(ReadCommand command,
+ public PartitionRangeQueryPager(PartitionRangeReadQuery query,
ProtocolVersion protocolVersion,
DecoratedKey lastReturnedKey,
PagingState.RowMark lastReturnedRow,
int remaining,
int remainingInPartition)
{
- super(command, protocolVersion);
+ super(query, protocolVersion);
this.lastReturnedKey = lastReturnedKey;
this.lastReturnedRow = lastReturnedRow;
restoreState(lastReturnedKey, remaining, remainingInPartition);
@@ -59,7 +58,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
public PartitionRangeQueryPager withUpdatedLimit(DataLimits newLimits)
{
- return new PartitionRangeQueryPager(command.withUpdatedLimit(newLimits),
+ return new PartitionRangeQueryPager(query.withUpdatedLimit(newLimits),
protocolVersion,
lastReturnedKey,
lastReturnedRow,
@@ -74,16 +73,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
: new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition());
}
- protected ReadCommand nextPageReadCommand(int pageSize)
- throws RequestExecutionException
+ @Override
+ protected PartitionRangeReadQuery nextPageReadQuery(int pageSize)
{
DataLimits limits;
- DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
+ DataRange fullRange = query.dataRange();
DataRange pageRange;
if (lastReturnedKey == null)
{
pageRange = fullRange;
- limits = command.limits().forPaging(pageSize);
+ limits = query.limits().forPaging(pageSize);
}
else
{
@@ -92,17 +91,17 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
if (includeLastKey)
{
- pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false);
- limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
+ pageRange = fullRange.forPaging(bounds, query.metadata().comparator, lastReturnedRow.clustering(query.metadata()), false);
+ limits = query.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
}
else
{
pageRange = fullRange.forSubRange(bounds);
- limits = command.limits().forPaging(pageSize);
+ limits = query.limits().forPaging(pageSize);
}
}
- return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange);
+ return query.withUpdatedLimitsAndDataRange(limits, pageRange);
}
protected void recordLast(DecoratedKey key, Row last)
@@ -111,7 +110,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
{
lastReturnedKey = key;
if (last.clustering() != Clustering.STATIC_CLUSTERING)
- lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+ lastReturnedRow = PagingState.RowMark.create(query.metadata(), last, protocolVersion);
}
}
@@ -123,18 +122,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey)
{
- AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
+ AbstractBounds<PartitionPosition> bounds = query.dataRange().keyRange();
if (bounds instanceof Range || bounds instanceof Bounds)
{
return includeLastKey
- ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
- : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
- }
- else
- {
- return includeLastKey
- ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right)
- : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
+ ? new Bounds<>(lastReturnedKey, bounds.right)
+ : new Range<>(lastReturnedKey, bounds.right);
}
+
+ return includeLastKey
+ ? new IncludingExcludingBounds<>(lastReturnedKey, bounds.right)
+ : new ExcludingBounds<>(lastReturnedKey, bounds.right);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index e95c358..93a0265 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -29,40 +29,36 @@ import org.apache.cassandra.transport.ProtocolVersion;
*
* For use by MultiPartitionPager.
*/
-public class SinglePartitionPager extends AbstractQueryPager
+public class SinglePartitionPager extends AbstractQueryPager<SinglePartitionReadQuery>
{
- private final SinglePartitionReadCommand command;
-
private volatile PagingState.RowMark lastReturned;
- public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, ProtocolVersion protocolVersion)
+ public SinglePartitionPager(SinglePartitionReadQuery query, PagingState state, ProtocolVersion protocolVersion)
{
- super(command, protocolVersion);
- this.command = command;
+ super(query, protocolVersion);
if (state != null)
{
lastReturned = state.rowMark;
- restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
+ restoreState(query.partitionKey(), state.remaining, state.remainingInPartition);
}
}
- private SinglePartitionPager(SinglePartitionReadCommand command,
+ private SinglePartitionPager(SinglePartitionReadQuery query,
ProtocolVersion protocolVersion,
PagingState.RowMark rowMark,
int remaining,
int remainingInPartition)
{
- super(command, protocolVersion);
- this.command = command;
+ super(query, protocolVersion);
this.lastReturned = rowMark;
- restoreState(command.partitionKey(), remaining, remainingInPartition);
+ restoreState(query.partitionKey(), remaining, remainingInPartition);
}
@Override
public SinglePartitionPager withUpdatedLimit(DataLimits newLimits)
{
- return new SinglePartitionPager(command.withUpdatedLimit(newLimits),
+ return new SinglePartitionPager(query.withUpdatedLimit(newLimits),
protocolVersion,
lastReturned,
maxRemaining(),
@@ -71,12 +67,12 @@ public class SinglePartitionPager extends AbstractQueryPager
public ByteBuffer key()
{
- return command.partitionKey().getKey();
+ return query.partitionKey().getKey();
}
public DataLimits limits()
{
- return command.limits();
+ return query.limits();
}
public PagingState state()
@@ -86,20 +82,21 @@ public class SinglePartitionPager extends AbstractQueryPager
: new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
}
- protected ReadCommand nextPageReadCommand(int pageSize)
+ @Override
+ protected SinglePartitionReadQuery nextPageReadQuery(int pageSize)
{
- Clustering clustering = lastReturned == null ? null : lastReturned.clustering(command.metadata());
+ Clustering clustering = lastReturned == null ? null : lastReturned.clustering(query.metadata());
DataLimits limits = lastReturned == null
? limits().forPaging(pageSize)
: limits().forPaging(pageSize, key(), remainingInPartition());
- return command.forPaging(clustering, limits);
+ return query.forPaging(clustering, limits);
}
protected void recordLast(DecoratedKey key, Row last)
{
if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING)
- lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+ lastReturned = PagingState.RowMark.create(query.metadata(), last, protocolVersion);
}
protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 7b8ef94..0852312 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -186,6 +186,7 @@ public class CacheProviderTest
assertEquals(key1.hashCode(), key2.hashCode());
tm = TableMetadata.builder("ks", "tab.indexFoo", id1)
+ .kind(TableMetadata.Kind.INDEX)
.addPartitionKeyColumn("pk", UTF8Type.instance)
.indexes(Indexes.of(IndexMetadata.fromSchemaMetadata("indexFoo", IndexMetadata.Kind.KEYS, Collections.emptyMap())))
.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index e53342d..662e804 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -837,6 +837,11 @@ public abstract class CQLTester
return sessionNet(protocolVersion).execute(formatQuery(query), values);
}
+ protected com.datastax.driver.core.ResultSet executeNet(String query, Object... values) throws Throwable
+ {
+ return sessionNet().execute(formatQuery(query), values);
+ }
+
protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable
{
return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/4] cassandra git commit: Implement virtual keyspace interface
Posted by al...@apache.org.
Implement virtual keyspace interface
patch by Benjamin Lehrer, Chris Lohfink, and Aleksey Yeschenko for
CASSANDRA-7622
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d464cd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d464cd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d464cd2
Branch: refs/heads/trunk
Commit: 0d464cd25ffbb5734f96c3082f9cc35011de3667
Parents: 3b6c938
Author: Chris Lohfink <cl...@apple.com>
Authored: Wed May 16 23:07:04 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Fri May 18 16:45:45 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/cqlsh.py | 84 ++++-
.../ClusteringColumnRestrictions.java | 8 +-
.../restrictions/MultiColumnRestriction.java | 14 +-
.../PartitionKeySingleRestrictionSet.java | 6 +-
.../cql3/restrictions/Restriction.java | 10 +-
.../cql3/restrictions/RestrictionSet.java | 10 +-
.../restrictions/RestrictionSetWrapper.java | 15 +-
.../restrictions/SingleColumnRestriction.java | 24 +-
.../restrictions/StatementRestrictions.java | 29 +-
.../cql3/restrictions/TokenFilter.java | 10 +-
.../cql3/restrictions/TokenRestriction.java | 6 +-
.../cql3/statements/AlterKeyspaceStatement.java | 2 +
.../cql3/statements/AlterTableStatement.java | 3 +
.../cql3/statements/BatchStatement.java | 56 ++-
.../cql3/statements/BatchUpdatesCollector.java | 58 ++-
.../cql3/statements/CQL3CasRequest.java | 21 +-
.../statements/CreateAggregateStatement.java | 7 +-
.../statements/CreateFunctionStatement.java | 6 +-
.../cql3/statements/CreateIndexStatement.java | 3 +
.../cql3/statements/CreateTableStatement.java | 5 +-
.../cql3/statements/CreateTriggerStatement.java | 2 +
.../cql3/statements/CreateTypeStatement.java | 2 +
.../cql3/statements/CreateViewStatement.java | 4 +-
.../cql3/statements/DeleteStatement.java | 2 +
.../cql3/statements/DropKeyspaceStatement.java | 4 +
.../cql3/statements/DropTableStatement.java | 3 +
.../cql3/statements/ModificationStatement.java | 12 +-
.../cql3/statements/SelectStatement.java | 31 +-
.../statements/SingleTableUpdatesCollector.java | 8 +-
.../cql3/statements/TruncateStatement.java | 10 +
.../apache/cassandra/db/AbstractReadQuery.java | 116 ++++++
src/java/org/apache/cassandra/db/Keyspace.java | 2 +
.../cassandra/db/PartitionRangeReadCommand.java | 38 +-
.../cassandra/db/PartitionRangeReadQuery.java | 93 +++++
.../org/apache/cassandra/db/ReadCommand.java | 100 +----
src/java/org/apache/cassandra/db/ReadQuery.java | 184 ++++++---
.../db/SinglePartitionReadCommand.java | 178 ++-------
.../cassandra/db/SinglePartitionReadQuery.java | 290 +++++++++++++++
.../db/VirtualTablePartitionRangeReadQuery.java | 113 ++++++
.../cassandra/db/VirtualTableReadQuery.java | 65 ++++
.../VirtualTableSinglePartitionReadQuery.java | 194 ++++++++++
.../cassandra/db/filter/ColumnFilter.java | 2 +
.../db/partitions/PartitionIterators.java | 10 +-
.../db/partitions/PartitionUpdate.java | 3 +-
.../db/virtual/AbstractVirtualTable.java | 221 +++++++++++
.../cassandra/db/virtual/SimpleDataSet.java | 191 ++++++++++
.../db/virtual/SystemViewsKeyspace.java | 32 ++
.../cassandra/db/virtual/VirtualKeyspace.java | 58 +++
.../db/virtual/VirtualKeyspaceRegistry.java | 77 ++++
.../cassandra/db/virtual/VirtualMutation.java | 111 ++++++
.../db/virtual/VirtualSchemaKeyspace.java | 149 ++++++++
.../cassandra/db/virtual/VirtualTable.java | 74 ++++
.../apache/cassandra/index/IndexRegistry.java | 69 ++++
.../index/internal/CassandraIndex.java | 1 +
.../cassandra/schema/KeyspaceMetadata.java | 39 +-
.../org/apache/cassandra/schema/Schema.java | 19 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 2 +-
.../apache/cassandra/schema/TableMetadata.java | 54 +--
.../apache/cassandra/service/CASRequest.java | 4 +-
.../cassandra/service/CassandraDaemon.java | 5 +
.../apache/cassandra/service/ClientState.java | 5 +
.../apache/cassandra/service/StorageProxy.java | 18 +-
.../cassandra/service/StorageService.java | 18 +-
.../service/pager/AbstractQueryPager.java | 30 +-
.../service/pager/MultiPartitionPager.java | 46 +--
.../service/pager/PartitionRangeQueryPager.java | 51 ++-
.../service/pager/SinglePartitionPager.java | 33 +-
.../cassandra/cache/CacheProviderTest.java | 1 +
.../org/apache/cassandra/cql3/CQLTester.java | 5 +
.../validation/entities/VirtualTableTest.java | 372 +++++++++++++++++++
71 files changed, 2936 insertions(+), 593 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5657567..e7e5ecb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Implement virtual keyspace interface (CASSANDRA-7622)
* nodetool import cleanup and improvements (CASSANDRA-14417)
* Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
* Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 3055110..e8e380f 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -437,6 +437,9 @@ class Shell(cmd.Cmd):
shunted_query_out = None
use_paging = True
+ # TODO remove after virtual tables are added to connection metadata
+ virtual_keyspaces = None
+
default_page_size = 100
def __init__(self, hostname, port, color=False,
@@ -628,7 +631,10 @@ class Shell(cmd.Cmd):
self.connection_versions = vers
def get_keyspace_names(self):
- return map(str, self.conn.metadata.keyspaces.keys())
+ # TODO remove after virtual tables are added to connection metadata
+ if self.virtual_keyspaces is None:
+ self.init_virtual_keyspaces_meta()
+ return map(str, self.conn.metadata.keyspaces.keys() + self.virtual_keyspaces.keys())
def get_columnfamily_names(self, ksname=None):
if ksname is None:
@@ -692,9 +698,78 @@ class Shell(cmd.Cmd):
return self.conn.metadata.partitioner
def get_keyspace_meta(self, ksname):
- if ksname not in self.conn.metadata.keyspaces:
- raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
- return self.conn.metadata.keyspaces[ksname]
+ if ksname in self.conn.metadata.keyspaces:
+ return self.conn.metadata.keyspaces[ksname]
+
+ # TODO remove after virtual tables are added to connection metadata
+ if self.virtual_keyspaces is None:
+ self.init_virtual_keyspaces_meta()
+ if ksname in self.virtual_keyspaces:
+ return self.virtual_keyspaces[ksname]
+
+ raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
+
+ # TODO remove after virtual tables are added to connection metadata
+ def init_virtual_keyspaces_meta(self):
+ self.virtual_keyspaces = {}
+ for vkeyspace in self.fetch_virtual_keyspaces():
+ self.virtual_keyspaces[vkeyspace.name] = vkeyspace
+
+ # TODO remove after virtual tables are added to connection metadata
+ def fetch_virtual_keyspaces(self):
+ keyspaces = []
+
+ result = self.session.execute('SELECT keyspace_name FROM system_virtual_schema.keyspaces;')
+ for row in result:
+ name = row['keyspace_name']
+ keyspace = KeyspaceMetadata(name, False, None, None)
+ tables = self.fetch_virtual_tables(name)
+ for table in tables:
+ keyspace.tables[table.name] = table
+ keyspaces.append(keyspace)
+
+ return keyspaces
+
+ # TODO remove after virtual tables are added to connection metadata
+ def fetch_virtual_tables(self, keyspace_name):
+ tables = []
+
+ result = self.session.execute("SELECT * FROM system_virtual_schema.tables WHERE keyspace_name = '{}';".format(keyspace_name))
+ for row in result:
+ name = row['table_name']
+ table = TableMetadata(keyspace_name, name)
+ self.fetch_virtual_columns(table)
+ tables.append(table)
+
+ return tables
+
+ # TODO remove after virtual tables are added to connection metadata
+ def fetch_virtual_columns(self, table):
+ result = self.session.execute("SELECT * FROM system_virtual_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}';".format(table.keyspace_name, table.name))
+
+ partition_key_columns = []
+ clustering_columns = []
+
+ for row in result:
+ name = row['column_name']
+ cql_type = row['type']
+ kind = row['kind']
+ position = row['position']
+ is_static = kind == 'static'
+ is_reversed = row['clustering_order'] == 'desc'
+ column = ColumnMetadata(table, name, cql_type, is_static, is_reversed)
+ table.columns[column.name] = column
+
+ if kind == 'partition_key':
+ partition_key_columns.append((position, column))
+ elif kind == 'clustering':
+ clustering_columns.append((position, column))
+
+ partition_key_columns.sort(key=lambda t: t[0])
+ clustering_columns.sort(key=lambda t: t[0])
+
+ table.partition_key = map(lambda t: t[1], partition_key_columns)
+ table.clustering_key = map(lambda t: t[1], clustering_columns)
def get_keyspaces(self):
return self.conn.metadata.keyspaces.values()
@@ -707,7 +782,6 @@ class Shell(cmd.Cmd):
if ksname is None:
ksname = self.current_keyspace
ksmeta = self.get_keyspace_meta(ksname)
-
if tablename not in ksmeta.tables:
if ksname == 'system_auth' and tablename in ['roles', 'role_permissions']:
self.get_fake_auth_table_meta(ksname, tablename)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
index f537255..265d354 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.utils.btree.BTreeSet;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -199,7 +199,7 @@ final class ClusteringColumnRestrictions extends RestrictionSetWrapper
@Override
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options) throws InvalidRequestException
{
int position = 0;
@@ -207,9 +207,9 @@ final class ClusteringColumnRestrictions extends RestrictionSetWrapper
for (SingleRestriction restriction : restrictions)
{
// We ignore all the clustering columns that can be handled by slices.
- if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexManager))
+ if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexRegistry))
{
- restriction.addRowFilterTo(filter, indexManager, options);
+ restriction.addRowFilterTo(filter, indexRegistry, options);
continue;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
index e1202a6..4c6ce2f 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.MultiCBuilder;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -111,9 +111,9 @@ public abstract class MultiColumnRestriction implements SingleRestriction
}
@Override
- public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+ public final boolean hasSupportingIndex(IndexRegistry indexRegistry)
{
- for (Index index : indexManager.listIndexes())
+ for (Index index : indexRegistry.listIndexes())
if (isSupportedBy(index))
return true;
return false;
@@ -186,7 +186,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
}
@Override
- public final void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexMananger, QueryOptions options)
+ public final void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
{
Tuples.Value t = ((Tuples.Value) value.bind(options));
List<ByteBuffer> values = t.getElements();
@@ -244,7 +244,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
@Override
public final void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
throw invalidRequest("IN restrictions are not supported on indexed columns");
@@ -473,7 +473,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
@Override
public final void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
throw invalidRequest("Multi-column slice restrictions cannot be used for filtering.");
@@ -561,7 +561,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
}
@Override
- public final void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexMananger, QueryOptions options)
+ public final void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
{
throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
index ac589be..5bb3242 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.MultiCBuilder;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
/**
* A set of single restrictions on the partition key.
@@ -121,12 +121,12 @@ final class PartitionKeySingleRestrictionSet extends RestrictionSetWrapper imple
@Override
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
for (SingleRestriction restriction : restrictions)
{
- restriction.addRowFilterTo(filter, indexManager, options);
+ restriction.addRowFilterTo(filter, indexRegistry, options);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
index daace46..91dedad 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
/**
* <p>Implementation of this class must be immutable.</p>
@@ -63,19 +63,19 @@ public interface Restriction
/**
* Check if the restriction is on indexed columns.
*
- * @param indexManager the index manager
+ * @param indexRegistry the index registry
* @return <code>true</code> if the restriction is on indexed columns, <code>false</code>
*/
- public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
+ public boolean hasSupportingIndex(IndexRegistry indexRegistry);
/**
* Adds to the specified row filter the expressions corresponding to this <code>Restriction</code>.
*
* @param filter the row filter to add expressions to
- * @param indexManager the secondary index manager
+ * @param indexRegistry the index registry
* @param options the query options
*/
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 7c805ce..427c396 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.ContainsRestriction;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -74,10 +74,10 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
}
@Override
- public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) throws InvalidRequestException
+ public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) throws InvalidRequestException
{
for (Restriction restriction : restrictions.values())
- restriction.addRowFilterTo(filter, indexManager, options);
+ restriction.addRowFilterTo(filter, indexRegistry, options);
}
@Override
@@ -184,11 +184,11 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
}
@Override
- public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+ public final boolean hasSupportingIndex(IndexRegistry indexRegistry)
{
for (Restriction restriction : restrictions.values())
{
- if (restriction.hasSupportingIndex(indexManager))
+ if (restriction.hasSupportingIndex(indexRegistry))
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index c310908..9803adc 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.cql3.restrictions;
import java.util.List;
import java.util.Set;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.cassandra.index.IndexRegistry;
/**
* A <code>RestrictionSet</code> wrapper that can be extended to allow to modify the <code>RestrictionSet</code>
@@ -45,10 +46,10 @@ class RestrictionSetWrapper implements Restrictions
}
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
- restrictions.addRowFilterTo(filter, indexManager, options);
+ restrictions.addRowFilterTo(filter, indexRegistry, options);
}
public List<ColumnMetadata> getColumnDefs()
@@ -71,9 +72,9 @@ class RestrictionSetWrapper implements Restrictions
return restrictions.size();
}
- public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+ public boolean hasSupportingIndex(IndexRegistry indexRegistry)
{
- return restrictions.hasSupportingIndex(indexManager);
+ return restrictions.hasSupportingIndex(indexRegistry);
}
public ColumnMetadata getFirstColumn()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 44f95a8..1b3482b 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.MultiCBuilder;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -71,9 +71,9 @@ public abstract class SingleColumnRestriction implements SingleRestriction
}
@Override
- public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+ public boolean hasSupportingIndex(IndexRegistry indexRegistry)
{
- for (Index index : indexManager.listIndexes())
+ for (Index index : indexRegistry.listIndexes())
if (isSupportedBy(index))
return true;
@@ -151,7 +151,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
@Override
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
filter.add(columnDef, Operator.EQ, value.bindAndGet(options));
@@ -215,7 +215,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
@Override
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
throw invalidRequest("IN restrictions are not supported on indexed columns");
@@ -385,7 +385,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
}
@Override
- public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+ public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
{
for (Bound b : Bound.values())
if (hasBound(b))
@@ -475,7 +475,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
}
@Override
- public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+ public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
{
for (ByteBuffer value : bindAndGet(values, options))
filter.add(columnDef, Operator.CONTAINS, value);
@@ -615,7 +615,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
@Override
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions");
@@ -691,16 +691,16 @@ public abstract class SingleColumnRestriction implements SingleRestriction
@Override
public void addRowFilterTo(RowFilter filter,
- SecondaryIndexManager indexManager,
+ IndexRegistry indexRegistry,
QueryOptions options)
{
Pair<Operator, ByteBuffer> operation = makeSpecific(value.bindAndGet(options));
// there must be a suitable INDEX for LIKE_XXX expressions
RowFilter.SimpleExpression expression = filter.add(columnDef, operation.left, operation.right);
- indexManager.getBestIndexFor(expression)
- .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns",
- expression));
+ indexRegistry.getBestIndexFor(expression)
+ .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns",
+ expression));
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 0240b6f..af1a964 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.btree.BTreeSet;
@@ -133,14 +133,9 @@ public final class StatementRestrictions
{
this(type, table, allowFiltering);
- ColumnFamilyStore cfs;
- SecondaryIndexManager secondaryIndexManager = null;
-
+ IndexRegistry indexRegistry = null;
if (type.allowUseOfSecondaryIndices())
- {
- cfs = Keyspace.open(table.keyspace).getColumnFamilyStore(table.name);
- secondaryIndexManager = cfs.indexManager;
- }
+ indexRegistry = IndexRegistry.obtain(table);
/*
* WHERE clause. For a given entity, rules are:
@@ -165,7 +160,7 @@ public final class StatementRestrictions
{
Restriction restriction = relation.toRestriction(table, boundNames);
- if (!type.allowUseOfSecondaryIndices() || !restriction.hasSupportingIndex(secondaryIndexManager))
+ if (!type.allowUseOfSecondaryIndices() || !restriction.hasSupportingIndex(indexRegistry))
throw new InvalidRequestException(String.format("LIKE restriction is only supported on properly " +
"indexed columns. %s is not valid.",
relation.toString()));
@@ -186,13 +181,13 @@ public final class StatementRestrictions
if (type.allowUseOfSecondaryIndices())
{
if (whereClause.containsCustomExpressions())
- processCustomIndexExpressions(whereClause.expressions, boundNames, secondaryIndexManager);
+ processCustomIndexExpressions(whereClause.expressions, boundNames, indexRegistry);
- hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
+ hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(indexRegistry);
hasQueriableIndex = !filterRestrictions.getCustomIndexExpressions().isEmpty()
|| hasQueriableClusteringColumnIndex
- || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
- || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
+ || partitionKeyRestrictions.hasSupportingIndex(indexRegistry)
+ || nonPrimaryKeyRestrictions.hasSupportingIndex(indexRegistry);
}
// At this point, the select statement if fully constructed, but we still have a few things to validate
@@ -564,7 +559,7 @@ public final class StatementRestrictions
private void processCustomIndexExpressions(List<CustomIndexExpression> expressions,
VariableSpecifications boundNames,
- SecondaryIndexManager indexManager)
+ IndexRegistry indexRegistry)
{
if (expressions.size() > 1)
throw new InvalidRequestException(IndexRestrictions.MULTIPLE_EXPRESSIONS);
@@ -582,7 +577,7 @@ public final class StatementRestrictions
if (!table.indexes.has(expression.targetIndex.getIdx()))
throw IndexRestrictions.indexNotFound(expression.targetIndex, table);
- Index index = indexManager.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
+ Index index = indexRegistry.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
if (!index.getIndexMetadata().isCustom())
throw IndexRestrictions.nonCustomIndexInExpression(expression.targetIndex);
@@ -596,14 +591,14 @@ public final class StatementRestrictions
filterRestrictions.add(expression);
}
- public RowFilter getRowFilter(SecondaryIndexManager indexManager, QueryOptions options)
+ public RowFilter getRowFilter(IndexRegistry indexRegistry, QueryOptions options)
{
if (filterRestrictions.isEmpty())
return RowFilter.NONE;
RowFilter filter = RowFilter.create();
for (Restrictions restrictions : filterRestrictions.getRestrictions())
- restrictions.addRowFilterTo(filter, indexManager, options);
+ restrictions.addRowFilterTo(filter, indexRegistry, options);
for (CustomIndexExpression expression : filterRestrictions.getCustomIndexExpressions())
expression.addToRowFilter(filter, table, options);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index a80b811..437b17c 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import static org.apache.cassandra.cql3.statements.Bound.END;
import static org.apache.cassandra.cql3.statements.Bound.START;
@@ -272,15 +272,15 @@ final class TokenFilter implements PartitionKeyRestrictions
}
@Override
- public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+ public boolean hasSupportingIndex(IndexRegistry indexRegistry)
{
- return restrictions.hasSupportingIndex(indexManager);
+ return restrictions.hasSupportingIndex(indexRegistry);
}
@Override
- public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+ public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
{
- restrictions.addRowFilterTo(filter, indexManager, options);
+ restrictions.addRowFilterTo(filter, indexRegistry, options);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index 806974a..e71b177 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
@@ -116,13 +116,13 @@ public abstract class TokenRestriction implements PartitionKeyRestrictions
}
@Override
- public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager)
+ public boolean hasSupportingIndex(IndexRegistry indexRegistry)
{
return false;
}
@Override
- public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+ public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
{
throw new UnsupportedOperationException("Index expression cannot be created for token restriction");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index e26910d..00d2b94 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -66,6 +66,8 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
throw new InvalidRequestException("Unknown keyspace " + name);
if (SchemaConstants.isLocalSystemKeyspace(ksm.name))
throw new InvalidRequestException("Cannot alter system keyspace");
+ if (ksm.isVirtual())
+ throw new InvalidRequestException("Cannot alter virtual keyspaces");
attrs.validate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index b3aeb74..260c8fd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -88,6 +88,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
if (current.isView())
throw new InvalidRequestException("Cannot use ALTER TABLE on Materialized View");
+ if (current.isVirtual())
+ throw new InvalidRequestException("Cannot alter virtual tables");
+
TableMetadata.Builder builder = current.unbuild();
ColumnIdentifier columnName = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 2fcd867..a71c799 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -70,6 +70,8 @@ public class BatchStatement implements CQLStatement
private final boolean updatesStaticRow;
private final Attributes attrs;
private final boolean hasConditions;
+ private final boolean updatesVirtualTables;
+
private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
private static final String UNLOGGED_BATCH_WARNING = "Unlogged batch covering {} partitions detected " +
@@ -103,11 +105,13 @@ public class BatchStatement implements CQLStatement
RegularAndStaticColumns.Builder conditionBuilder = RegularAndStaticColumns.builder();
boolean updateRegular = false;
boolean updateStatic = false;
+ boolean updatesVirtualTables = false;
for (ModificationStatement stmt : statements)
{
regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns());
updateRegular |= stmt.updatesRegularRows();
+ updatesVirtualTables |= stmt.isVirtual();
if (stmt.hasConditions())
{
hasConditions = true;
@@ -121,6 +125,7 @@ public class BatchStatement implements CQLStatement
this.updatesRegularRows = updateRegular;
this.updatesStaticRow = updateStatic;
this.hasConditions = hasConditions;
+ this.updatesVirtualTables = updatesVirtualTables;
}
public Iterable<org.apache.cassandra.cql3.functions.Function> getFunctions()
@@ -161,29 +166,46 @@ public class BatchStatement implements CQLStatement
boolean hasCounters = false;
boolean hasNonCounters = false;
+ boolean hasVirtualTables = false;
+ boolean hasRegularTables = false;
+
for (ModificationStatement statement : statements)
{
- if (timestampSet && statement.isCounter())
- throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters");
-
if (timestampSet && statement.isTimestampSet())
throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
- if (isCounter() && !statement.isCounter())
- throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
-
- if (isLogged() && statement.isCounter())
- throw new InvalidRequestException("Cannot include a counter statement in a logged batch");
-
if (statement.isCounter())
hasCounters = true;
else
hasNonCounters = true;
+
+ if (statement.isVirtual())
+ hasVirtualTables = true;
+ else
+ hasRegularTables = true;
}
+ if (timestampSet && hasCounters)
+ throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters");
+
+ if (isCounter() && hasNonCounters)
+ throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
+
if (hasCounters && hasNonCounters)
throw new InvalidRequestException("Counter and non-counter mutations cannot exist in the same batch");
+ if (isLogged() && hasCounters)
+ throw new InvalidRequestException("Cannot include a counter statement in a logged batch");
+
+ if (isLogged() && hasVirtualTables)
+ throw new InvalidRequestException("Cannot include a virtual table statement in a logged batch");
+
+ if (hasVirtualTables && hasRegularTables)
+ throw new InvalidRequestException("Mutations for virtual and regular tables cannot exist in the same batch");
+
+ if (hasConditions && hasVirtualTables)
+ throw new InvalidRequestException("Conditional BATCH statements cannot include mutations for virtual tables");
+
if (hasConditions)
{
String ksName = null;
@@ -353,7 +375,11 @@ public class BatchStatement implements CQLStatement
if (hasConditions)
return executeWithConditions(options, queryState, queryStartNanoTime);
- executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+ if (updatesVirtualTables)
+ executeInternalWithoutCondition(queryState, options, queryStartNanoTime);
+ else
+ executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+
return new ResultMessage.Void();
}
@@ -482,16 +508,18 @@ public class BatchStatement implements CQLStatement
public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
+ BatchQueryOptions batchOptions = BatchQueryOptions.withoutPerStatementVariables(options);
+
if (hasConditions)
- return executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options), queryState);
+ return executeInternalWithConditions(batchOptions, queryState);
- executeInternalWithoutCondition(queryState, options, System.nanoTime());
+ executeInternalWithoutCondition(queryState, batchOptions, System.nanoTime());
return new ResultMessage.Void();
}
- private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
+ private ResultMessage executeInternalWithoutCondition(QueryState queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
- for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryStartNanoTime))
+ for (IMutation mutation : getMutations(batchOptions, true, queryState.getTimestamp(), queryStartNanoTime))
mutation.apply();
return null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index 9671b02..96d9f5a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -84,15 +84,20 @@ final class BatchUpdatesCollector implements UpdatesCollector
private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
{
- String ksName = metadata.keyspace;
- IMutationBuilder mutationBuilder = keyspaceMap(ksName).get(dk.getKey());
- if (mutationBuilder == null)
+ return keyspaceMap(metadata.keyspace).computeIfAbsent(dk.getKey(), k -> makeMutationBuilder(metadata, dk, consistency));
+ }
+
+ private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKey partitionKey, ConsistencyLevel cl)
+ {
+ if (metadata.isVirtual())
{
- MutationBuilder builder = new MutationBuilder(ksName, dk);
- mutationBuilder = metadata.isCounter() ? new CounterMutationBuilder(builder, consistency) : builder;
- keyspaceMap(ksName).put(dk.getKey(), mutationBuilder);
+ return new VirtualMutationBuilder(metadata.keyspace, partitionKey);
+ }
+ else
+ {
+ MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey);
+ return metadata.isCounter() ? new CounterMutationBuilder(builder, cl) : builder;
}
- return mutationBuilder;
}
/**
@@ -228,4 +233,41 @@ final class BatchUpdatesCollector implements UpdatesCollector
return mutationBuilder.get(id);
}
}
+
+ private static class VirtualMutationBuilder implements IMutationBuilder
+ {
+ private final String keyspaceName;
+ private final DecoratedKey partitionKey;
+
+ private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>();
+
+ private VirtualMutationBuilder(String keyspaceName, DecoratedKey partitionKey)
+ {
+ this.keyspaceName = keyspaceName;
+ this.partitionKey = partitionKey;
+ }
+
+ @Override
+ public VirtualMutationBuilder add(PartitionUpdate.Builder builder)
+ {
+ PartitionUpdate.Builder prev = modifications.put(builder.metadata().id, builder);
+ if (null != prev)
+ throw new IllegalStateException();
+ return this;
+ }
+
+ @Override
+ public VirtualMutation build()
+ {
+ ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
+ modifications.forEach((tableId, updateBuilder) -> updates.put(tableId, updateBuilder.build()));
+ return new VirtualMutation(keyspaceName, partitionKey, updates.build());
+ }
+
+ @Override
+ public PartitionUpdate.Builder get(TableId tableId)
+ {
+ return modifications.get(tableId);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index eb6aa70..7953c8b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -22,6 +22,7 @@ import java.util.*;
import com.google.common.collect.*;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.conditions.ColumnCondition;
@@ -183,7 +184,7 @@ public class CQL3CasRequest implements CASRequest
return conditionColumns;
}
- public SinglePartitionReadCommand readCommand(int nowInSec)
+ public SinglePartitionReadQuery readCommand(int nowInSec)
{
assert staticConditions != null || !conditions.isEmpty();
@@ -193,16 +194,16 @@ public class CQL3CasRequest implements CASRequest
// With only a static condition, we still want to make the distinction between a non-existing partition and one
// that exists (has some live data) but has not static content. So we query the first live row of the partition.
if (conditions.isEmpty())
- return SinglePartitionReadCommand.create(metadata,
- nowInSec,
- columnFilter,
- RowFilter.NONE,
- DataLimits.cqlLimits(1),
- key,
- new ClusteringIndexSliceFilter(Slices.ALL, false));
+ return SinglePartitionReadQuery.create(metadata,
+ nowInSec,
+ columnFilter,
+ RowFilter.NONE,
+ DataLimits.cqlLimits(1),
+ key,
+ new ClusteringIndexSliceFilter(Slices.ALL, false));
ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(conditions.navigableKeySet(), false);
- return SinglePartitionReadCommand.create(metadata, nowInSec, key, columnFilter, filter);
+ return SinglePartitionReadQuery.create(metadata, nowInSec, key, columnFilter, filter);
}
/**
@@ -244,7 +245,7 @@ public class CQL3CasRequest implements CASRequest
upd.applyUpdates(current, updateBuilder);
PartitionUpdate partitionUpdate = updateBuilder.build();
- Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate);
+ IndexRegistry.obtain(metadata).validate(partitionUpdate);
return partitionUpdate;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index d3ef599..e428087 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.*;
@@ -206,8 +207,12 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
if (ifNotExists && orReplace)
throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
- if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
+
+ KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(functionName.keyspace);
+ if (null == ksm)
throw new InvalidRequestException(String.format("Cannot add aggregate '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
+ if (ksm.isVirtual())
+ throw new InvalidRequestException("Cannot create aggregates in virtual keyspaces");
}
public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index 1f0e703..c380991 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -136,8 +137,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
if (ifNotExists && orReplace)
throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
- if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
+ KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(functionName.keyspace);
+ if (null == ksm)
throw new InvalidRequestException(String.format("Cannot add function '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
+ if (ksm.isVirtual())
+ throw new InvalidRequestException("Cannot create functions in virtual keyspaces");
}
public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 9d0a714..778c4a3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -82,6 +82,9 @@ public class CreateIndexStatement extends SchemaAlteringStatement
{
TableMetadata table = Schema.instance.validateTable(keyspace(), columnFamily());
+ if (table.isVirtual())
+ throw new InvalidRequestException("Secondary indexes are not supported on virtual tables");
+
if (table.isCounter())
throw new InvalidRequestException("Secondary indexes are not supported on counter tables");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 55249c4..7c639e2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.regex.Pattern;
+
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import org.apache.commons.lang3.StringUtils;
@@ -134,7 +135,6 @@ public class CreateTableStatement extends SchemaAlteringStatement
.isCompound(isCompound)
.isCounter(hasCounters)
.isSuper(false)
- .isView(false)
.params(params);
for (int i = 0; i < keyAliases.size(); i++)
@@ -213,6 +213,9 @@ public class CreateTableStatement extends SchemaAlteringStatement
KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace());
if (ksm == null)
throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace()));
+ if (ksm.isVirtual())
+ throw new InvalidRequestException("Cannot create tables in virtual keyspaces");
+
return prepare(ksm.types);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
index d57bff7..f2cd217 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
@@ -62,6 +62,8 @@ public class CreateTriggerStatement extends SchemaAlteringStatement
public void validate(ClientState state) throws RequestValidationException
{
TableMetadata metadata = Schema.instance.validateTable(keyspace(), columnFamily());
+ if (metadata.isVirtual())
+ throw new InvalidRequestException("Cannot CREATE TRIGGER against a virtual table");
if (metadata.isView())
throw new InvalidRequestException("Cannot CREATE TRIGGER against a materialized view");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index 7dce478..1a0da4c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -73,6 +73,8 @@ public class CreateTypeStatement extends SchemaAlteringStatement
KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(name.getKeyspace());
if (ksm == null)
throw new InvalidRequestException(String.format("Cannot add type in unknown keyspace %s", name.getKeyspace()));
+ if (ksm.isVirtual())
+ throw new InvalidRequestException("Cannot create types in virtual keyspaces");
if (ksm.types.get(name.getUserTypeName()).isPresent() && !ifNotExists)
throw new InvalidRequestException(String.format("A user type of name %s already exists", name));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 01ed6fe..b50a552 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -151,6 +151,8 @@ public class CreateViewStatement extends SchemaAlteringStatement
TableMetadata metadata = Schema.instance.validateTable(baseName.getKeyspace(), baseName.getColumnFamily());
+ if (metadata.isVirtual())
+ throw new InvalidRequestException("Materialized views are not supported on virtual tables");
if (metadata.isCounter())
throw new InvalidRequestException("Materialized views are not supported on counter tables");
if (metadata.isView())
@@ -317,7 +319,7 @@ public class CreateViewStatement extends SchemaAlteringStatement
TableMetadata.Builder builder =
TableMetadata.builder(keyspace(), columnFamily(), properties.properties.getId())
- .isView(true)
+ .kind(TableMetadata.Kind.VIEW)
.params(params);
add(metadata, targetPartitionKeys, builder::addPartitionKeyColumn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index af04572..639286c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -144,6 +144,8 @@ public class DeleteStatement extends ModificationStatement
Conditions conditions,
Attributes attrs)
{
+ checkFalse(metadata.isVirtual(), "Virtual tables don't support DELETE statements");
+
Operations operations = new Operations(type);
for (Operation.RawDeletion deletion : deletions)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
index cfc6564..2f302c0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
@@ -61,6 +62,9 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement
public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws ConfigurationException
{
+ if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspace))
+ throw new InvalidRequestException("Cannot drop virtual keyspaces");
+
try
{
MigrationManager.announceKeyspaceDrop(keyspace, isLocalOnly);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index d7801e5..beb1002 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -75,6 +75,9 @@ public class DropTableStatement extends SchemaAlteringStatement
if (metadata.isView())
throw new InvalidRequestException("Cannot use DROP TABLE on Materialized View");
+ if (metadata.isVirtual())
+ throw new InvalidRequestException("Cannot drop virtual tables");
+
boolean rejectDrop = false;
StringBuilder messageBuilder = new StringBuilder();
for (ViewMetadata def : ksm.views)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 31aa80c..e02fd41 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -203,6 +203,11 @@ public abstract class ModificationStatement implements CQLStatement
return metadata().isView();
}
+ public boolean isVirtual()
+ {
+ return metadata().isVirtual();
+ }
+
public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
{
return attrs.getTimestamp(now, options);
@@ -248,6 +253,8 @@ public abstract class ModificationStatement implements CQLStatement
checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates");
checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates");
checkFalse(isView(), "Cannot directly modify a materialized view");
+ checkFalse(isVirtual() && attrs.isTimeToLiveSet(), "Expiring columns are not supported by virtual tables");
+ checkFalse(isVirtual() && hasConditions(), "Conditional updates are not supported by virtual tables");
}
public RegularAndStaticColumns updatedColumns()
@@ -440,6 +447,9 @@ public abstract class ModificationStatement implements CQLStatement
private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime)
throws RequestExecutionException, RequestValidationException
{
+ if (isVirtual())
+ return executeInternalWithoutCondition(queryState, options, queryStartNanoTime);
+
ConsistencyLevel cl = options.getConsistency();
if (isCounter())
cl.validateCounterForWrite(metadata());
@@ -613,7 +623,7 @@ public abstract class ModificationStatement implements CQLStatement
{
UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
- SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+ SinglePartitionReadQuery readCommand = request.readCommand(FBUtilities.nowInSeconds());
FilteredPartition current;
try (ReadExecutionController executionController = readCommand.executionController();
PartitionIterator iter = readCommand.executeInternal(executionController))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7fa9964..ef3db51 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -55,7 +55,7 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
@@ -458,7 +458,7 @@ public class SelectStatement implements CQLStatement
{
QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
- if (aggregationSpec == null || query == ReadQuery.EMPTY)
+ if (aggregationSpec == null || query.isEmpty())
return pager;
return new AggregationQueryPager(pager, query.limits());
@@ -501,25 +501,22 @@ public class SelectStatement implements CQLStatement
{
Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
if (keys.isEmpty())
- return ReadQuery.EMPTY;
+ return ReadQuery.empty(table);
ClusteringIndexFilter filter = makeClusteringIndexFilter(options, columnFilter);
if (filter == null)
- return ReadQuery.EMPTY;
+ return ReadQuery.empty(table);
RowFilter rowFilter = getRowFilter(options);
- // Note that we use the total limit for every key, which is potentially inefficient.
- // However, IN + LIMIT is not a very sensible choice.
- List<SinglePartitionReadCommand> commands = new ArrayList<>(keys.size());
+ List<DecoratedKey> decoratedKeys = new ArrayList<>(keys.size());
for (ByteBuffer key : keys)
{
QueryProcessor.validateKey(key);
- DecoratedKey dk = table.partitioner.decorateKey(ByteBufferUtil.clone(key));
- commands.add(SinglePartitionReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, dk, filter));
+ decoratedKeys.add(table.partitioner.decorateKey(ByteBufferUtil.clone(key)));
}
- return new SinglePartitionReadCommand.Group(commands, limit);
+ return SinglePartitionReadQuery.createGroup(table, nowInSec, columnFilter, rowFilter, limit, decoratedKeys, filter);
}
/**
@@ -569,7 +566,7 @@ public class SelectStatement implements CQLStatement
{
ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options, columnFilter);
if (clusteringIndexFilter == null)
- return ReadQuery.EMPTY;
+ return ReadQuery.empty(table);
RowFilter rowFilter = getRowFilter(options);
@@ -577,10 +574,10 @@ public class SelectStatement implements CQLStatement
// We want to have getRangeSlice to count the number of columns, not the number of keys.
AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
if (keyBounds == null)
- return ReadQuery.EMPTY;
+ return ReadQuery.empty(table);
- PartitionRangeReadCommand command =
- PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+ ReadQuery command =
+ PartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
// If there's a secondary index that the command can use, have it validate the request parameters.
command.maybeValidateIndex();
@@ -755,10 +752,8 @@ public class SelectStatement implements CQLStatement
*/
public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException
{
- ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
- SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
- RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options);
- return filter;
+ IndexRegistry indexRegistry = IndexRegistry.obtain(table);
+ return restrictions.getRowFilter(indexRegistry, options);
}
private ResultSet process(PartitionIterator partitions,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 9eaf897..1def3fd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.schema.TableMetadata;
/**
@@ -86,12 +87,15 @@ final class SingleTableUpdatesCollector implements UpdatesCollector
List<IMutation> ms = new ArrayList<>();
for (PartitionUpdate.Builder builder : puBuilders.values())
{
- IMutation mutation = null;
+ IMutation mutation;
- if (metadata.isCounter())
+ if (metadata.isVirtual())
+ mutation = new VirtualMutation(builder.build());
+ else if (metadata.isCounter())
mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel);
else
mutation = new Mutation(builder.build());
+
mutation.validateIndexedColumns();
ms.add(mutation);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 5d09cfa..d41a814 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -69,6 +69,9 @@ public class TruncateStatement extends CFStatement implements CQLStatement
if (metaData.isView())
throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
+ if (metaData.isVirtual())
+ throw new InvalidRequestException("Cannot truncate virtual tables");
+
StorageProxy.truncateBlocking(keyspace(), columnFamily());
}
catch (UnavailableException | TimeoutException e)
@@ -82,6 +85,13 @@ public class TruncateStatement extends CFStatement implements CQLStatement
{
try
{
+ TableMetadata metaData = Schema.instance.getTableMetadata(keyspace(), columnFamily());
+ if (metaData.isView())
+ throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
+
+ if (metaData.isVirtual())
+ throw new InvalidRequestException("Cannot truncate virtual tables");
+
ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
cfs.truncateBlocking();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/AbstractReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadQuery.java b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
new file mode 100644
index 0000000..c6ec329
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Base class for {@code ReadQuery} implementations.
+ */
+abstract class AbstractReadQuery extends MonitorableImpl implements ReadQuery
+{
+ private final TableMetadata metadata;
+ private final int nowInSec;
+
+ private final ColumnFilter columnFilter;
+ private final RowFilter rowFilter;
+ private final DataLimits limits;
+
+ protected AbstractReadQuery(TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ {
+ this.metadata = metadata;
+ this.nowInSec = nowInSec;
+ this.columnFilter = columnFilter;
+ this.rowFilter = rowFilter;
+ this.limits = limits;
+ }
+
+ @Override
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
+
+ // Monitorable interface
+ public String name()
+ {
+ return toCQLString();
+ }
+
+ @Override
+ public PartitionIterator executeInternal(ReadExecutionController controller)
+ {
+ return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
+ }
+
+ @Override
+ public DataLimits limits()
+ {
+ return limits;
+ }
+
+ @Override
+ public int nowInSec()
+ {
+ return nowInSec;
+ }
+
+ @Override
+ public RowFilter rowFilter()
+ {
+ return rowFilter;
+ }
+
+ @Override
+ public ColumnFilter columnFilter()
+ {
+ return columnFilter;
+ }
+
+ /**
+ * Recreate the CQL string corresponding to this query.
+ * <p>
+ * Note that in general the returned string will not be exactly the original user string, first
+ * because there isn't always a single syntax for a given query, but also because we don't have
+ * all the information needed (we know the non-PK columns queried but not the PK ones as internally
+ * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
+ * debugging purpose which is what this is for.
+ */
+ public String toCQLString()
+ {
+ StringBuilder sb = new StringBuilder().append("SELECT ")
+ .append(columnFilter())
+ .append(" FROM ")
+ .append(metadata().keyspace)
+ .append('.')
+ .append(metadata().name);
+ appendCQLWhereClause(sb);
+
+ if (limits() != DataLimits.NONE)
+ sb.append(' ').append(limits());
+ return sb.toString();
+ }
+
+ protected abstract void appendCQLWhereClause(StringBuilder sb);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 651d156..50720f4 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -327,6 +327,8 @@ public class Keyspace
{
metadata = Schema.instance.getKeyspaceMetadata(keyspaceName);
assert metadata != null : "Unknown keyspace " + keyspaceName;
+ if (metadata.isVirtual())
+ throw new IllegalStateException("Cannot initialize Keyspace with virtual metadata " + keyspaceName);
createReplicationStrategy(metadata);
this.metric = new KeyspaceMetrics(this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 27d7d4e..a6641d4 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -45,15 +45,13 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.FBUtilities;
/**
* A read command that selects a (part of a) range of partitions.
*/
-public class PartitionRangeReadCommand extends ReadCommand
+public class PartitionRangeReadCommand extends ReadCommand implements PartitionRangeReadQuery
{
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
@@ -187,7 +185,8 @@ public class PartitionRangeReadCommand extends ReadCommand
indexMetadata());
}
- public ReadCommand withUpdatedLimit(DataLimits newLimits)
+ @Override
+ public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
{
return new PartitionRangeReadCommand(isDigestQuery(),
digestVersion(),
@@ -200,6 +199,7 @@ public class PartitionRangeReadCommand extends ReadCommand
indexMetadata());
}
+ @Override
public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
{
return new PartitionRangeReadCommand(isDigestQuery(),
@@ -218,34 +218,11 @@ public class PartitionRangeReadCommand extends ReadCommand
return DatabaseDescriptor.getRangeRpcTimeout();
}
- public boolean selectsKey(DecoratedKey key)
- {
- if (!dataRange().contains(key))
- return false;
-
- return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
- }
-
- public boolean selectsClustering(DecoratedKey key, Clustering clustering)
- {
- if (clustering == Clustering.STATIC_CLUSTERING)
- return !columnFilter().fetchedColumns().statics.isEmpty();
-
- if (!dataRange().clusteringIndexFilter(key).selects(clustering))
- return false;
- return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
- }
-
public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
{
return StorageProxy.getRangeSlice(this, consistency, queryStartNanoTime);
}
- public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
- {
- return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
- }
-
protected void recordLatency(TableMetrics metric, long latencyNanos)
{
metric.rangeLatency.addNano(latencyNanos);
@@ -389,13 +366,6 @@ public class PartitionRangeReadCommand extends ReadCommand
}
@Override
- public boolean selectsFullPartition()
- {
- return metadata().isStaticCompactTable() ||
- (dataRange.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns());
- }
-
- @Override
public String toString()
{
return String.format("Read(%s columns=%s rowfilter=%s limits=%s %s)",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org