You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/05/18 15:46:19 UTC

[1/4] cassandra git commit: Implement virtual keyspace interface

Repository: cassandra
Updated Branches:
  refs/heads/trunk 3b6c93828 -> 0d464cd25


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
new file mode 100644
index 0000000..3581a73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.validation.entities;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable;
+import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.triggers.ITrigger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class VirtualTableTest extends CQLTester
+{
+    private static final String KS_NAME = "test_virtual_ks";
+    private static final String VT1_NAME = "vt1";
+    private static final String VT2_NAME = "vt2";
+
+    private static class WritableVirtualTable extends AbstractVirtualTable
+    {
+        private final ColumnMetadata valueColumn;
+        private final Map<String, Integer> backingMap = new HashMap<>();
+
+        WritableVirtualTable(String keyspaceName, String tableName)
+        {
+            super(TableMetadata.builder(keyspaceName, tableName)
+                               .kind(TableMetadata.Kind.VIRTUAL)
+                               .addPartitionKeyColumn("key", UTF8Type.instance)
+                               .addRegularColumn("value", Int32Type.instance)
+                               .build());
+            valueColumn = metadata().regularColumns().getSimple(0);
+        }
+
+        @Override
+        public DataSet data()
+        {
+            SimpleDataSet data = new SimpleDataSet(metadata());
+            backingMap.forEach((key, value) -> data.row(key).column("value", value));
+            return data;
+        }
+
+        @Override
+        public void apply(PartitionUpdate update)
+        {
+            String key = (String) metadata().partitionKeyType.compose(update.partitionKey().getKey());
+            update.forEach(row ->
+            {
+                Integer value = Int32Type.instance.compose(row.getCell(valueColumn).value());
+                backingMap.put(key, value);
+            });
+        }
+    }
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        TableMetadata vt1Metadata =
+            TableMetadata.builder(KS_NAME, VT1_NAME)
+                         .kind(TableMetadata.Kind.VIRTUAL)
+                         .addPartitionKeyColumn("pk", UTF8Type.instance)
+                         .addClusteringColumn("c", UTF8Type.instance)
+                         .addRegularColumn("v1", Int32Type.instance)
+                         .addRegularColumn("v2", LongType.instance)
+                         .build();
+
+        SimpleDataSet vt1data = new SimpleDataSet(vt1Metadata);
+
+        vt1data.row("pk1", "c1").column("v1", 11).column("v2", 11L)
+               .row("pk2", "c1").column("v1", 21).column("v2", 21L)
+               .row("pk1", "c2").column("v1", 12).column("v2", 12L)
+               .row("pk2", "c2").column("v1", 22).column("v2", 22L)
+               .row("pk1", "c3").column("v1", 13).column("v2", 13L)
+               .row("pk2", "c3").column("v1", 23).column("v2", 23L);
+
+        VirtualTable vt1 = new AbstractVirtualTable(vt1Metadata)
+        {
+            public DataSet data()
+            {
+                return vt1data;
+            }
+        };
+        VirtualTable vt2 = new WritableVirtualTable(KS_NAME, VT2_NAME);
+
+        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2)));
+
+        CQLTester.setUpClass();
+    }
+
+    @Test
+    public void testQueries() throws Throwable
+    {
+        assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'UNKNOWN'"));
+
+        assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'UNKNOWN'"));
+
+        // Test DISTINCT query
+        assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1"),
+                      row("pk1"),
+                      row("pk2"));
+
+        assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1 WHERE token(pk) > token('pk1')"),
+                      row("pk2"));
+
+        // Test single partition queries
+        assertRowsNet(executeNet("SELECT v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'c1'"),
+                      row(11, 11L));
+
+        assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c1', 'c2')"),
+                      row("c1", 11, 11L),
+                      row("c2", 12, 12L));
+
+        assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c2', 'c1') ORDER BY c DESC"),
+                      row("c2", 12, 12L),
+                      row("c1", 11, 11L));
+
+        // Test multi-partition queries
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"),
+                   row("pk1", "c1", 11, 11L),
+                   row("pk1", "c2", 12, 12L),
+                   row("pk2", "c1", 21, 21L),
+                   row("pk2", "c2", 22, 22L));
+
+        assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC"),
+                   row("pk1", "c2", 12),
+                   row("pk2", "c2", 22),
+                   row("pk1", "c1", 11),
+                   row("pk2", "c1", 21));
+
+        assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC LIMIT 1"),
+                   row("pk1", "c2", 12));
+
+        assertRows(execute("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1' , 'c3') ORDER BY c DESC PER PARTITION LIMIT 1"),
+                   row("c3", 13, 13L),
+                   row("c3", 23, 23L));
+
+        assertRows(execute("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"),
+                   row(4L));
+
+        for (int pageSize = 1; pageSize < 5; pageSize++)
+        {
+            assertRowsNet(executeNetWithPaging("SELECT pk, c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize),
+                          row("pk1", "c1", 11, 11L),
+                          row("pk1", "c2", 12, 12L),
+                          row("pk2", "c1", 21, 21L),
+                          row("pk2", "c2", 22, 22L));
+
+            assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') LIMIT 2", pageSize),
+                          row("pk1", "c1", 11, 11L),
+                          row("pk1", "c2", 12, 12L));
+
+            assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize),
+                          row(4L));
+        }
+
+        // Test range queries
+        for (int pageSize = 1; pageSize < 4; pageSize++)
+        {
+            assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') ALLOW FILTERING", pageSize),
+                          row("pk1", "c1", 11, 11L),
+                          row("pk1", "c2", 12, 12L));
+
+            assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') LIMIT 1 ALLOW FILTERING", pageSize),
+                          row("pk1", "c1", 11, 11L));
+
+            assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) <= token('pk2') AND c > 'c1' PER PARTITION LIMIT 1 ALLOW FILTERING", pageSize),
+                          row("pk1", "c2", 12, 12L),
+                          row("pk2", "c2", 22, 22L));
+
+            assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE token(pk) = token('pk2') AND c < 'c3' ALLOW FILTERING", pageSize),
+                          row(2L));
+        }
+    }
+
+    @Test
+    public void testModifications() throws Throwable
+    {
+        // check for clean state
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"));
+
+        // fill the table, test UNLOGGED batch
+        execute("BEGIN UNLOGGED BATCH " +
+                "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
+                "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
+                "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+                "APPLY BATCH");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                   row("pk1", 1),
+                   row("pk2", 2),
+                   row("pk3", 3));
+
+        // test that LOGGED batches don't allow virtual table updates
+        assertInvalidMessage("Cannot include a virtual table statement in a logged batch",
+                             "BEGIN BATCH " +
+                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
+                             "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
+                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+                             "APPLY BATCH");
+
+        // test that UNLOGGED batch doesn't allow mixing updates for regular and virtual tables
+        createTable("CREATE TABLE %s (key text PRIMARY KEY, value int)");
+        assertInvalidMessage("Mutations for virtual and regular tables cannot exist in the same batch",
+                             "BEGIN UNLOGGED BATCH " +
+                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1'" +
+                             "UPDATE %s                  SET value = 2 WHERE key ='pk2'" +
+                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3'" +
+                             "APPLY BATCH");
+
+        // update a single value with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET value = 11 WHERE key ='pk1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk1'"),
+                   row("pk1", 11));
+
+        // update a single value with INSERT
+        executeNet("INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22)");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 'pk2'"),
+                   row("pk2", 22));
+
+        // test that deletions are (currently) rejected
+        assertInvalidMessage("Virtual tables don't support DELETE statements",
+                             "DELETE FROM test_virtual_ks.vt2 WHERE key ='pk1'");
+
+        // test that TTL is (currently) rejected with INSERT and UPDATE
+        assertInvalidMessage("Expiring columns are not supported by virtual tables",
+                             "INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk1', 11) USING TTL 86400");
+        assertInvalidMessage("Expiring columns are not supported by virtual tables",
+                             "UPDATE test_virtual_ks.vt2 USING TTL 86400 SET value = 11 WHERE key ='pk1'");
+
+        // test that LWT is (currently) rejected with virtual tables in batches
+        assertInvalidMessage("Conditional BATCH statements cannot include mutations for virtual tables",
+                             "BEGIN UNLOGGED BATCH " +
+                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2;" +
+                             "APPLY BATCH");
+
+        // test that LWT is (currently) rejected with virtual tables in UPDATEs
+        assertInvalidMessage("Conditional updates are not supported by virtual tables",
+                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3' IF value = 2");
+
+        // test that LWT is (currently) rejected with virtual tables in INSERTs
+        assertInvalidMessage("Conditional updates are not supported by virtual tables",
+                             "INSERT INTO test_virtual_ks.vt2 (key, value) VALUES ('pk2', 22) IF NOT EXISTS");
+    }
+
+    @Test
+    public void testInvalidDDLOperations() throws Throwable
+    {
+        assertInvalidMessage("Cannot drop virtual keyspaces",
+                             "DROP KEYSPACE test_virtual_ks");
+
+        assertInvalidMessage("Cannot alter virtual keyspaces",
+                             "ALTER KEYSPACE test_virtual_ks WITH durable_writes = false");
+
+        assertInvalidMessage("Cannot create tables in virtual keyspaces",
+                             "CREATE TABLE test_virtual_ks.test (id int PRIMARY KEY)");
+
+        assertInvalidMessage("Cannot create types in virtual keyspaces",
+                             "CREATE TYPE test_virtual_ks.type (id int)");
+
+        assertInvalidMessage("Cannot drop virtual tables",
+                             "DROP TABLE test_virtual_ks.vt1");
+
+        assertInvalidMessage("Cannot alter virtual tables",
+                             "ALTER TABLE test_virtual_ks.vt1 DROP v1");
+
+        assertInvalidMessage("Cannot truncate virtual tables",
+                             "TRUNCATE TABLE test_virtual_ks.vt1");
+
+        assertInvalidMessage("Secondary indexes are not supported on virtual tables",
+                             "CREATE INDEX ON test_virtual_ks.vt1 (v1)");
+
+        assertInvalidMessage("Materialized views are not supported on virtual tables",
+                             "CREATE MATERIALIZED VIEW test_virtual_ks.mvt1 AS SELECT c, v1 FROM test_virtual_ks.vt1 WHERE c IS NOT NULL PRIMARY KEY(c)");
+
+        assertInvalidMessage("Cannot CREATE TRIGGER against a virtual table",
+                             "CREATE TRIGGER test_trigger ON test_virtual_ks.vt1 USING '" + TestTrigger.class.getName() + '\'');
+    }
+
+    /**
+     * Noop trigger for audit log testing
+     */
+    public static class TestTrigger implements ITrigger
+    {
+        public Collection<Mutation> augment(Partition update)
+        {
+            return null;
+        }
+    }
+
+    @Test
+    public void testMBeansMethods() throws Throwable
+    {
+        StorageServiceMBean mbean = StorageService.instance;
+
+        assertJMXFails(() -> mbean.forceKeyspaceCompaction(false, KS_NAME));
+        assertJMXFails(() -> mbean.forceKeyspaceCompaction(false, KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.scrub(true, true, true, true, 1, KS_NAME));
+        assertJMXFails(() -> mbean.scrub(true, true, true, true, 1, KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.verify(true, KS_NAME));
+        assertJMXFails(() -> mbean.verify(true, KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.upgradeSSTables(KS_NAME, false, 1));
+        assertJMXFails(() -> mbean.upgradeSSTables(KS_NAME, false, 1, VT1_NAME));
+
+        assertJMXFails(() -> mbean.garbageCollect("ROW", 1, KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.forceKeyspaceFlush(KS_NAME));
+        assertJMXFails(() -> mbean.forceKeyspaceFlush(KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.truncate(KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.loadNewSSTables(KS_NAME, VT1_NAME));
+
+        assertJMXFails(() -> mbean.getAutoCompactionStatus(KS_NAME));
+        assertJMXFails(() -> mbean.getAutoCompactionStatus(KS_NAME, VT1_NAME));
+    }
+
+    @FunctionalInterface
+    private static interface ThrowingRunnable
+    {
+        public void run() throws Throwable;
+    }
+
+    private void assertJMXFails(ThrowingRunnable r) throws Throwable
+    {
+        try
+        {
+            r.run();
+            fail();
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertEquals("Cannot perform any operations against virtual keyspace " + KS_NAME, e.getMessage());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/4] cassandra git commit: Implement virtual keyspace interface

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
new file mode 100644
index 0000000..560fbe9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.service.pager.PartitionRangeQueryPager;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ *  A {@code ReadQuery} for a range of partitions.
+ */
+public interface PartitionRangeReadQuery extends ReadQuery
+{
+    static ReadQuery create(TableMetadata table,
+                            int nowInSec,
+                            ColumnFilter columnFilter,
+                            RowFilter rowFilter,
+                            DataLimits limits,
+                            DataRange dataRange)
+    {
+        if (table.isVirtual())
+            return VirtualTablePartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
+
+        return PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
+    }
+
+    DataRange dataRange();
+
+    /**
+     * Creates a new {@code PartitionRangeReadQuery} with the updated limits.
+     *
+     * @param newLimits the new limits
+     * @return the new {@code PartitionRangeReadQuery}
+     */
+    PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits);
+
+    /**
+     * Creates a new {@code PartitionRangeReadQuery} with the updated limits and data range.
+     *
+     * @param newLimits the new limits
+     * @return the new {@code PartitionRangeReadQuery}
+     */
+    PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange);
+
+    default QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
+    {
+        return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
+    }
+
+    default boolean selectsKey(DecoratedKey key)
+    {
+        if (!dataRange().contains(key))
+            return false;
+
+        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
+    }
+
+    default boolean selectsClustering(DecoratedKey key, Clustering clustering)
+    {
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return !columnFilter().fetchedColumns().statics.isEmpty();
+
+        if (!dataRange().clusteringIndexFilter(key).selects(clustering))
+            return false;
+        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+    }
+
+    default boolean selectsFullPartition()
+    {
+        return metadata().isStaticCompactTable() ||
+               (dataRange().selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 128f8f3..064dd77 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.util.function.LongPredicate;
-import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
@@ -29,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
-import org.apache.cassandra.db.monitoring.MonitorableImpl;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.StoppingTransformation;
@@ -37,6 +35,7 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -57,19 +56,13 @@ import org.apache.cassandra.utils.FBUtilities;
  * <p>
  * This contains all the informations needed to do a local read.
  */
-public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
+public abstract class ReadCommand extends AbstractReadQuery
 {
     private static final int TEST_ITERATION_DELAY_MILLIS = Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", "0"));
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 
     private final Kind kind;
-    private final TableMetadata metadata;
-    private final int nowInSec;
-
-    private final ColumnFilter columnFilter;
-    private final RowFilter rowFilter;
-    private final DataLimits limits;
 
     private final boolean isDigestQuery;
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
@@ -115,14 +108,10 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
                           DataLimits limits,
                           IndexMetadata index)
     {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
         this.kind = kind;
         this.isDigestQuery = isDigestQuery;
         this.digestVersion = digestVersion;
-        this.metadata = metadata;
-        this.nowInSec = nowInSec;
-        this.columnFilter = columnFilter;
-        this.rowFilter = rowFilter;
-        this.limits = limits;
         this.index = index;
     }
 
@@ -140,30 +129,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
     public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
 
     /**
-     * The metadata for the table queried.
-     *
-     * @return the metadata for the table queried.
-     */
-    public TableMetadata metadata()
-    {
-        return metadata;
-    }
-
-    /**
-     * The time in seconds to use as "now" for this query.
-     * <p>
-     * We use the same time as "now" for the whole query to avoid considering different
-     * values as expired during the query, which would be buggy (would throw of counting amongst other
-     * things).
-     *
-     * @return the time (in seconds) to use as "now".
-     */
-    public int nowInSec()
-    {
-        return nowInSec;
-    }
-
-    /**
      * The configured timeout for this command.
      *
      * @return the configured timeout for this command.
@@ -171,43 +136,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
     public abstract long getTimeout();
 
     /**
-     * A filter on which (non-PK) columns must be returned by the query.
-     *
-     * @return which columns must be fetched by this query.
-     */
-    public ColumnFilter columnFilter()
-    {
-        return columnFilter;
-    }
-
-    /**
-     * Filters/Resrictions on CQL rows.
-     * <p>
-     * This contains the restrictions that are not directly handled by the
-     * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column
-     * restrictions and can include some PK columns restrictions when those can't be
-     * satisfied entirely by the clustering index filter (because not all clustering columns
-     * have been restricted for instance). If there is 2ndary indexes on the table,
-     * one of this restriction might be handled by a 2ndary index.
-     *
-     * @return the filter holding the expression that rows must satisfy.
-     */
-    public RowFilter rowFilter()
-    {
-        return rowFilter;
-    }
-
-    /**
-     * The limits set on this query.
-     *
-     * @return the limits set on this query.
-     */
-    public DataLimits limits()
-    {
-        return limits;
-    }
-
-    /**
      * Whether this query is a digest one or not.
      *
      * @return Whether this query is a digest query.
@@ -327,9 +255,8 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
      */
     public void maybeValidateIndex()
     {
-        Index index = getIndex(Keyspace.openAndGetStore(metadata));
         if (null != index)
-            index.validate(this);
+            IndexRegistry.obtain(metadata()).getIndex(index).validate(this);
     }
 
     /**
@@ -388,11 +315,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
 
     protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 
-    public PartitionIterator executeInternal(ReadExecutionController controller)
-    {
-        return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
-    }
-
     public ReadExecutionController executionController()
     {
         return ReadExecutionController.forCommand(this);
@@ -410,7 +332,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
             private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 
             private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace);
-            private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness();
+            private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();
 
             private int liveRows = 0;
             private int tombstones = 0;
@@ -553,7 +475,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
 
         private void maybeDelayForTesting()
         {
-            if (!metadata.keyspace.startsWith("system"))
+            if (!metadata().keyspace.startsWith("system"))
                 FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
         }
     }
@@ -605,7 +527,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
     {
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT ").append(columnFilter());
-        sb.append(" FROM ").append(metadata().keyspace).append('.').append(metadata.name);
+        sb.append(" FROM ").append(metadata().keyspace).append('.').append(metadata().name);
         appendCQLWhereClause(sb);
 
         if (limits() != DataLimits.NONE)
@@ -657,11 +579,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
             out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata()));
             if (command.isDigestQuery())
                 out.writeUnsignedVInt(command.digestVersion());
-            command.metadata.id.serialize(out);
+            command.metadata().id.serialize(out);
             out.writeInt(command.nowInSec());
             ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
-            DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator);
+            DataLimits.serializer.serialize(command.limits(), out, version, command.metadata().comparator);
             if (null != command.index)
                 IndexMetadata.serializer.serialize(command.index, out, version);
 
@@ -714,11 +636,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
         {
             return 2 // kind + flags
                    + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
-                   + command.metadata.id.serializedSize()
+                   + command.metadata().id.serializedSize()
                    + TypeSizes.sizeof(command.nowInSec())
                    + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                    + RowFilter.serializer.serializedSize(command.rowFilter(), version)
-                   + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata.comparator)
+                   + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata().comparator)
                    + command.selectionSerializedSize(version)
                    + command.indexSerializedSize(version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index d527d28..fd94aa1 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -17,74 +17,113 @@
  */
 package org.apache.cassandra.db;
 
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Generic abstraction for read queries.
- * <p>
- * The main implementation of this is {@link ReadCommand} but we have this interface because
- * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a
- * {@code ReadCommand}.
  */
 public interface ReadQuery
 {
-    ReadQuery EMPTY = new ReadQuery()
+    public static ReadQuery empty(final TableMetadata metadata)
     {
-        public ReadExecutionController executionController()
+        return new ReadQuery()
         {
-            return ReadExecutionController.empty();
-        }
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
 
-        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
-        {
-            return EmptyIterators.partition();
-        }
+            public ReadExecutionController executionController()
+            {
+                return ReadExecutionController.empty();
+            }
 
-        public PartitionIterator executeInternal(ReadExecutionController controller)
-        {
-            return EmptyIterators.partition();
-        }
+            public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+            {
+                return EmptyIterators.partition();
+            }
 
-        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
-        {
-            return EmptyIterators.unfilteredPartition(executionController.metadata());
-        }
+            public PartitionIterator executeInternal(ReadExecutionController controller)
+            {
+                return EmptyIterators.partition();
+            }
 
-        public DataLimits limits()
-        {
-            // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means
-            // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging"
-            // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this.
-            return DataLimits.cqlLimits(0);
-        }
+            public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+            {
+                return EmptyIterators.unfilteredPartition(executionController.metadata());
+            }
 
-        public QueryPager getPager(PagingState state, ProtocolVersion protocolVersion)
-        {
-            return QueryPager.EMPTY;
-        }
+            public DataLimits limits()
+            {
+                // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means
+                // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging"
+                // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this.
+                return DataLimits.cqlLimits(0);
+            }
 
-        public boolean selectsKey(DecoratedKey key)
-        {
-            return false;
-        }
+            public QueryPager getPager(PagingState state, ProtocolVersion protocolVersion)
+            {
+                return QueryPager.EMPTY;
+            }
 
-        public boolean selectsClustering(DecoratedKey key, Clustering clustering)
-        {
-            return false;
-        }
+            public boolean selectsKey(DecoratedKey key)
+            {
+                return false;
+            }
 
-        @Override
-        public boolean selectsFullPartition()
-        {
-            return false;
-        }
-    };
+            public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+            {
+                return false;
+            }
+
+            @Override
+            public int nowInSec()
+            {
+                return FBUtilities.nowInSeconds();
+            }
+
+            @Override
+            public boolean selectsFullPartition()
+            {
+                return false;
+            }
+
+            @Override
+            public boolean isEmpty()
+            {
+                return true;
+            }
+
+            @Override
+            public RowFilter rowFilter()
+            {
+                return RowFilter.NONE;
+            }
+
+            @Override
+            public ColumnFilter columnFilter()
+            {
+                return ColumnFilter.NONE;
+            }
+        };
+    }
+
+    /**
+     * The metadata for the table this is a query on.
+     *
+     * @return the metadata for the table this is a query on.
+     */
+    public TableMetadata metadata();
 
     /**
      * Starts a new read operation.
@@ -156,8 +195,63 @@ public interface ReadQuery
     public boolean selectsClustering(DecoratedKey key, Clustering clustering);
 
     /**
+     * The time in seconds to use as "now" for this query.
+     * <p>
+     * We use the same time as "now" for the whole query to avoid considering different
+     * values as expired during the query, which would be buggy (would throw of counting amongst other
+     * things).
+     *
+     * @return the time (in seconds) to use as "now".
+     */
+    public int nowInSec();
+
+    /**
      * Checks if this {@code ReadQuery} selects full partitions, that is it has no filtering on clustering or regular columns.
      * @return {@code true} if this {@code ReadQuery} selects full partitions, {@code false} otherwise.
      */
     public boolean selectsFullPartition();
+
+    /**
+     * Filters/Resrictions on CQL rows.
+     * <p>
+     * This contains the restrictions that are not directly handled by the
+     * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column
+     * restrictions and can include some PK columns restrictions when those can't be
+     * satisfied entirely by the clustering index filter (because not all clustering columns
+     * have been restricted for instance). If there is 2ndary indexes on the table,
+     * one of this restriction might be handled by a 2ndary index.
+     *
+     * @return the filter holding the expression that rows must satisfy.
+     */
+    public RowFilter rowFilter();
+
+    /**
+     * A filter on which (non-PK) columns must be returned by the query.
+     *
+     * @return which columns must be fetched by this query.
+     */
+    public ColumnFilter columnFilter();
+
+    /**
+     * Whether this query is known to return nothing upfront.
+     * <p>
+     * This is overridden by the {@code ReadQuery} created through {@link #empty(TableMetadata)}, and that's probably the
+     * only place that should override it.
+     *
+     * @return if this method is guaranteed to return no results whatsoever.
+     */
+    public default boolean isEmpty()
+    {
+        return false;
+    }
+
+    /**
+     * If the index manager for the table determines that there's an applicable
+     * 2i that can be used to execute this query, call its (optional)
+     * validation method to check that nothing in this query's parameters
+     * violates the implementation specific validation rules.
+     */
+    default void maybeValidateIndex()
+    {
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 64a25f8..7214106 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -20,14 +20,11 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
@@ -53,9 +50,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
@@ -63,7 +58,7 @@ import org.apache.cassandra.utils.btree.BTreeSet;
 /**
  * A read command that selects a (part of a) single partition.
  */
-public class SinglePartitionReadCommand extends ReadCommand
+public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery
 {
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
@@ -314,6 +309,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                                               indexMetadata());
     }
 
+    @Override
     public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
     {
         return new SinglePartitionReadCommand(isDigestQuery(),
@@ -328,11 +324,13 @@ public class SinglePartitionReadCommand extends ReadCommand
                                               indexMetadata());
     }
 
+    @Override
     public DecoratedKey partitionKey()
     {
         return partitionKey;
     }
 
+    @Override
     public ClusteringIndexFilter clusteringIndexFilter()
     {
         return clusteringIndexFilter;
@@ -348,35 +346,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         return DatabaseDescriptor.getReadRpcTimeout();
     }
 
-    public boolean selectsKey(DecoratedKey key)
-    {
-        if (!this.partitionKey().equals(key))
-            return false;
-
-        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
-    }
-
-    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
-    {
-        if (clustering == Clustering.STATIC_CLUSTERING)
-            return !columnFilter().fetchedColumns().statics.isEmpty();
-
-        if (!clusteringIndexFilter().selects(clustering))
-            return false;
-
-        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
-    }
-
-    /**
-     * Returns a new command suitable to paging from the last returned row.
-     *
-     * @param lastReturned the last row returned by the previous page. The newly created command
-     * will only query row that comes after this (in query order). This can be {@code null} if this
-     * is the first page.
-     * @param limits the limits to use for the page to query.
-     *
-     * @return the newly create command.
-     */
+    @Override
     public SinglePartitionReadCommand forPaging(Clustering lastReturned, DataLimits limits)
     {
         // We shouldn't have set digest yet when reaching that point
@@ -395,16 +365,6 @@ public class SinglePartitionReadCommand extends ReadCommand
         return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime);
     }
 
-    public SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
-    {
-        return getPager(this, pagingState, protocolVersion);
-    }
-
-    private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, ProtocolVersion protocolVersion)
-    {
-        return new SinglePartitionPager(command, pagingState, protocolVersion);
-    }
-
     protected void recordLatency(TableMetrics metric, long latencyNanos)
     {
         metric.readLatency.addNano(latencyNanos);
@@ -1054,23 +1014,34 @@ public class SinglePartitionReadCommand extends ReadCommand
     /**
      * Groups multiple single partition read commands.
      */
-    public static class Group implements ReadQuery
+    public static class Group extends SinglePartitionReadQuery.Group<SinglePartitionReadCommand>
     {
-        public final List<SinglePartitionReadCommand> commands;
-        private final DataLimits limits;
-        private final int nowInSec;
-        private final boolean selectsFullPartitions;
+        public static Group create(TableMetadata metadata,
+                                   int nowInSec,
+                                   ColumnFilter columnFilter,
+                                   RowFilter rowFilter,
+                                   DataLimits limits,
+                                   List<DecoratedKey> partitionKeys,
+                                   ClusteringIndexFilter clusteringIndexFilter)
+        {
+            List<SinglePartitionReadCommand> commands = new ArrayList<>(partitionKeys.size());
+            for (DecoratedKey partitionKey : partitionKeys)
+            {
+                commands.add(SinglePartitionReadCommand.create(metadata,
+                                                               nowInSec,
+                                                               columnFilter,
+                                                               rowFilter,
+                                                               limits,
+                                                               partitionKey,
+                                                               clusteringIndexFilter));
+            }
+
+            return new Group(commands, limits);
+        }
 
         public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
         {
-            assert !commands.isEmpty();
-            this.commands = commands;
-            this.limits = limits;
-            SinglePartitionReadCommand firstCommand = commands.get(0);
-            this.nowInSec = firstCommand.nowInSec();
-            this.selectsFullPartitions = firstCommand.selectsFullPartition();
-            for (int i = 1; i < commands.size(); i++)
-                assert commands.get(i).nowInSec() == nowInSec;
+            super(commands, limits);
         }
 
         public static Group one(SinglePartitionReadCommand command)
@@ -1082,97 +1053,6 @@ public class SinglePartitionReadCommand extends ReadCommand
         {
             return StorageProxy.read(this, consistency, clientState, queryStartNanoTime);
         }
-
-        public int nowInSec()
-        {
-            return nowInSec;
-        }
-
-        public DataLimits limits()
-        {
-            return limits;
-        }
-
-        public TableMetadata metadata()
-        {
-            return commands.get(0).metadata();
-        }
-
-        @Override
-        public boolean selectsFullPartition()
-        {
-            return selectsFullPartitions;
-        }
-
-        public ReadExecutionController executionController()
-        {
-            // Note that the only difference between the command in a group must be the partition key on which
-            // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
-            return commands.get(0).executionController();
-        }
-
-        public PartitionIterator executeInternal(ReadExecutionController controller)
-        {
-            // Note that the only difference between the command in a group must be the partition key on which
-            // they applied.
-            boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness();
-            return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec),
-                                 nowInSec,
-                                 selectsFullPartitions,
-                                 enforceStrictLiveness);
-        }
-
-        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
-        {
-            return executeLocally(executionController, true);
-        }
-
-        /**
-         * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}.
-         *
-         * @param executionController - the {@code ReadExecutionController} protecting the read.
-         * @param sort - whether to sort the inner commands by partition key, required for merging the iterator
-         *               later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)}
-         *               because in this case it is safe to do so as there is no merging involved and we don't want to
-         *               change the old behavior which was to not sort by partition.
-         *
-         * @return - the iterator that can be used to retrieve the query result.
-         */
-        private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort)
-        {
-            List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(commands.size());
-            for (SinglePartitionReadCommand cmd : commands)
-                partitions.add(Pair.of(cmd.partitionKey, cmd.executeLocally(executionController)));
-
-            if (sort)
-                Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft()));
-
-            return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList()));
-        }
-
-        public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
-        {
-            if (commands.size() == 1)
-                return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
-
-            return new MultiPartitionPager(this, pagingState, protocolVersion);
-        }
-
-        public boolean selectsKey(DecoratedKey key)
-        {
-            return Iterables.any(commands, c -> c.selectsKey(key));
-        }
-
-        public boolean selectsClustering(DecoratedKey key, Clustering clustering)
-        {
-            return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
-        }
-
-        @Override
-        public String toString()
-        {
-            return commands.toString();
-        }
     }
 
     private static class Deserializer extends SelectionDeserializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
new file mode 100644
index 0000000..f9f0014
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.pager.MultiPartitionPager;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.SinglePartitionPager;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * A {@code ReadQuery} for a single partition.
+ */
+public interface SinglePartitionReadQuery extends ReadQuery
+{
+    public static Group<? extends SinglePartitionReadQuery> createGroup(TableMetadata metadata,
+                                                                        int nowInSec,
+                                                                        ColumnFilter columnFilter,
+                                                                        RowFilter rowFilter,
+                                                                        DataLimits limits,
+                                                                        List<DecoratedKey> partitionKeys,
+                                                                        ClusteringIndexFilter clusteringIndexFilter)
+    {
+        return metadata.isVirtual()
+             ? VirtualTableSinglePartitionReadQuery.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter)
+             : SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
+    }
+
+
+    /**
+     * Creates a new read query on a single partition.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param columnFilter the column filter to use for the query.
+     * @param filter the clustering index filter to use for the query.
+     *
+     * @return a newly created read query. The returned query will use no row filter and have no limits.
+     */
+    public static SinglePartitionReadQuery create(TableMetadata metadata,
+                                                  int nowInSec,
+                                                  DecoratedKey key,
+                                                  ColumnFilter columnFilter,
+                                                  ClusteringIndexFilter filter)
+    {
+        return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
+    }
+
+    /**
+     * Creates a new read query on a single partition.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param columnFilter the column filter to use for the query.
+     * @param rowFilter the row filter to use for the query.
+     * @param limits the limits to use for the query.
+     * @param partitionKey the partition key for the partition to query.
+     * @param clusteringIndexFilter the clustering index filter to use for the query.
+     *
+     * @return a newly created read query.
+     */
+    public static SinglePartitionReadQuery create(TableMetadata metadata,
+                                                  int nowInSec,
+                                                  ColumnFilter columnFilter,
+                                                  RowFilter rowFilter,
+                                                  DataLimits limits,
+                                                  DecoratedKey partitionKey,
+                                                  ClusteringIndexFilter clusteringIndexFilter)
+    {
+        return metadata.isVirtual()
+             ? VirtualTableSinglePartitionReadQuery.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter)
+             : SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+    }
+
+    /**
+     * Returns the key of the partition queried by this {@code ReadQuery}
+     * @return the key of the partition queried
+     */
+    DecoratedKey partitionKey();
+
+    /**
+     * Creates a new {@code SinglePartitionReadQuery} with the specified limits.
+     *
+     * @param newLimits the new limits
+     * @return the new {@code SinglePartitionReadQuery}
+     */
+    SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits);
+
+    /**
+     * Returns a new {@code SinglePartitionReadQuery} suitable to paging from the last returned row.
+     *
+     * @param lastReturned the last row returned by the previous page. The newly created query
+     * will only query row that comes after this (in query order). This can be {@code null} if this
+     * is the first page.
+     * @param limits the limits to use for the page to query.
+     *
+     * @return the newly create query.
+     */
+    SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits limits);
+
+    @Override
+    default SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
+    {
+        return new SinglePartitionPager(this, pagingState, protocolVersion);
+    }
+
+    ClusteringIndexFilter clusteringIndexFilter();
+
+    default boolean selectsKey(DecoratedKey key)
+    {
+        if (!this.partitionKey().equals(key))
+            return false;
+
+        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
+    }
+
+    default boolean selectsClustering(DecoratedKey key, Clustering clustering)
+    {
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return !columnFilter().fetchedColumns().statics.isEmpty();
+
+        if (!clusteringIndexFilter().selects(clustering))
+            return false;
+
+        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+    }
+
+    /**
+     * Groups multiple single partition read queries.
+     */
+    abstract class Group<T extends SinglePartitionReadQuery> implements ReadQuery
+    {
+        public final List<T> queries;
+        private final DataLimits limits;
+        private final int nowInSec;
+        private final boolean selectsFullPartitions;
+
+        public Group(List<T> queries, DataLimits limits)
+        {
+            assert !queries.isEmpty();
+            this.queries = queries;
+            this.limits = limits;
+            T firstQuery = queries.get(0);
+            this.nowInSec = firstQuery.nowInSec();
+            this.selectsFullPartitions = firstQuery.selectsFullPartition();
+            for (int i = 1; i < queries.size(); i++)
+                assert queries.get(i).nowInSec() == nowInSec;
+        }
+
+        public int nowInSec()
+        {
+            return nowInSec;
+        }
+
+        public DataLimits limits()
+        {
+            return limits;
+        }
+
+        public TableMetadata metadata()
+        {
+            return queries.get(0).metadata();
+        }
+
+        @Override
+        public boolean selectsFullPartition()
+        {
+            return selectsFullPartitions;
+        }
+
+        public ReadExecutionController executionController()
+        {
+            // Note that the only difference between the queries in a group must be the partition key on which
+            // they applied. So as far as ReadOrderGroup is concerned, we can use any of the queries to start one.
+            return queries.get(0).executionController();
+        }
+
+        public PartitionIterator executeInternal(ReadExecutionController controller)
+        {
+            // Note that the only difference between the queries in a group must be the partition key on which
+            // they applied.
+            boolean enforceStrictLiveness = queries.get(0).metadata().enforceStrictLiveness();
+            return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec),
+                                 nowInSec,
+                                 selectsFullPartitions,
+                                 enforceStrictLiveness);
+        }
+
+        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+        {
+            return executeLocally(executionController, true);
+        }
+
+        /**
+         * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}.
+         *
+         * @param executionController - the {@code ReadExecutionController} protecting the read.
+         * @param sort - whether to sort the inner queries by partition key, required for merging the iterator
+         *               later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)}
+         *               because in this case it is safe to do so as there is no merging involved and we don't want to
+         *               change the old behavior which was to not sort by partition.
+         *
+         * @return - the iterator that can be used to retrieve the query result.
+         */
+        private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort)
+        {
+            List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(queries.size());
+            for (T query : queries)
+                partitions.add(Pair.of(query.partitionKey(), query.executeLocally(executionController)));
+
+            if (sort)
+                Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft()));
+
+            return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList()));
+        }
+
+        public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
+        {
+            if (queries.size() == 1)
+                return new SinglePartitionPager(queries.get(0), pagingState, protocolVersion);
+
+            return new MultiPartitionPager<T>(this, pagingState, protocolVersion);
+        }
+
+        public boolean selectsKey(DecoratedKey key)
+        {
+            return Iterables.any(queries, c -> c.selectsKey(key));
+        }
+
+        public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+        {
+            return Iterables.any(queries, c -> c.selectsClustering(key, clustering));
+        }
+
+        @Override
+        public RowFilter rowFilter()
+        {
+            // Note that the only difference between the query in a group must be the partition key on which
+            // they applied.
+            return queries.get(0).rowFilter();
+        }
+
+        @Override
+        public ColumnFilter columnFilter()
+        {
+            // Note that the only difference between the query in a group must be the partition key on which
+            // they applied.
+            return queries.get(0).columnFilter();
+        }
+
+        @Override
+        public String toString()
+        {
+            return queries.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
new file mode 100644
index 0000000..48cafa1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * A read query that selects a (part of a) range of partitions of a virtual table.
+ */
+public class VirtualTablePartitionRangeReadQuery extends VirtualTableReadQuery implements PartitionRangeReadQuery
+{
+    private final DataRange dataRange;
+
+    public static VirtualTablePartitionRangeReadQuery create(TableMetadata metadata,
+                                                             int nowInSec,
+                                                             ColumnFilter columnFilter,
+                                                             RowFilter rowFilter,
+                                                             DataLimits limits,
+                                                             DataRange dataRange)
+    {
+        return new VirtualTablePartitionRangeReadQuery(metadata,
+                                                       nowInSec,
+                                                       columnFilter,
+                                                       rowFilter,
+                                                       limits,
+                                                       dataRange);
+    }
+
+    private VirtualTablePartitionRangeReadQuery(TableMetadata metadata,
+                                                int nowInSec,
+                                                ColumnFilter columnFilter,
+                                                RowFilter rowFilter,
+                                                DataLimits limits,
+                                                DataRange dataRange)
+    {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
+        this.dataRange = dataRange;
+    }
+
+    @Override
+    public DataRange dataRange()
+    {
+        return dataRange;
+    }
+
+    @Override
+    public PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits)
+    {
+        return new VirtualTablePartitionRangeReadQuery(metadata(),
+                                                       nowInSec(),
+                                                       columnFilter(),
+                                                       rowFilter(),
+                                                       newLimits,
+                                                       dataRange());
+    }
+
+    @Override
+    public PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
+    {
+        return new VirtualTablePartitionRangeReadQuery(metadata(),
+                                                       nowInSec(),
+                                                       columnFilter(),
+                                                       rowFilter(),
+                                                       newLimits,
+                                                       newDataRange);
+    }
+
+    @Override
+    protected UnfilteredPartitionIterator queryVirtualTable()
+    {
+        VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+        return view.select(dataRange, columnFilter());
+    }
+
+    @Override
+    protected void appendCQLWhereClause(StringBuilder sb)
+    {
+        if (dataRange.isUnrestricted() && rowFilter().isEmpty())
+            return;
+
+        sb.append(" WHERE ");
+        // We put the row filter first because the data range can end by "ORDER BY"
+        if (!rowFilter().isEmpty())
+        {
+            sb.append(rowFilter());
+            if (!dataRange.isUnrestricted())
+                sb.append(" AND ");
+        }
+        if (!dataRange.isUnrestricted())
+            sb.append(dataRange.toCQLString(metadata()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
new file mode 100644
index 0000000..f2c9a49
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * Base class for the {@code ReadQuery} implementations use to query virtual tables.
+ */
+public abstract class VirtualTableReadQuery extends AbstractReadQuery
+{
+    protected VirtualTableReadQuery(TableMetadata metadata,
+                                    int nowInSec,
+                                    ColumnFilter columnFilter,
+                                    RowFilter rowFilter,
+                                    DataLimits limits)
+    {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
+    }
+
+    @Override
+    public ReadExecutionController executionController()
+    {
+        return ReadExecutionController.empty();
+    }
+
+    @Override
+    public PartitionIterator execute(ConsistencyLevel consistency,
+                                     ClientState clientState,
+                                     long queryStartNanoTime) throws RequestExecutionException
+    {
+        return executeInternal(executionController());
+    }
+
+    @Override
+    public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+    {
+        UnfilteredPartitionIterator resultIterator = queryVirtualTable();
+        return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+    }
+
+    protected abstract UnfilteredPartitionIterator queryVirtualTable();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
new file mode 100644
index 0000000..11f1f77
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A read query that selects a (part of a) single partition of a virtual table.
+ */
+public class VirtualTableSinglePartitionReadQuery extends VirtualTableReadQuery implements SinglePartitionReadQuery
+{
+    private final DecoratedKey partitionKey;
+    private final ClusteringIndexFilter clusteringIndexFilter;
+
+    public static VirtualTableSinglePartitionReadQuery create(TableMetadata metadata,
+                                                              int nowInSec,
+                                                              ColumnFilter columnFilter,
+                                                              RowFilter rowFilter,
+                                                              DataLimits limits,
+                                                              DecoratedKey partitionKey,
+                                                              ClusteringIndexFilter clusteringIndexFilter)
+    {
+        return new VirtualTableSinglePartitionReadQuery(metadata,
+                                                        nowInSec,
+                                                        columnFilter,
+                                                        rowFilter,
+                                                        limits,
+                                                        partitionKey,
+                                                        clusteringIndexFilter);
+    }
+
+    private VirtualTableSinglePartitionReadQuery(TableMetadata metadata,
+                                                 int nowInSec,
+                                                 ColumnFilter columnFilter,
+                                                 RowFilter rowFilter,
+                                                 DataLimits limits,
+                                                 DecoratedKey partitionKey,
+                                                 ClusteringIndexFilter clusteringIndexFilter)
+    {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
+        this.partitionKey = partitionKey;
+        this.clusteringIndexFilter = clusteringIndexFilter;
+    }
+
+    @Override
+    protected void appendCQLWhereClause(StringBuilder sb)
+    {
+        sb.append(" WHERE ");
+
+        sb.append(ColumnMetadata.toCQLString(metadata().partitionKeyColumns())).append(" = ");
+        DataRange.appendKeyString(sb, metadata().partitionKeyType, partitionKey().getKey());
+
+        // We put the row filter first because the clustering index filter can end by "ORDER BY"
+        if (!rowFilter().isEmpty())
+            sb.append(" AND ").append(rowFilter());
+
+        String filterString = clusteringIndexFilter().toCQLString(metadata());
+        if (!filterString.isEmpty())
+            sb.append(" AND ").append(filterString);
+    }
+
+    @Override
+    public ClusteringIndexFilter clusteringIndexFilter()
+    {
+        return clusteringIndexFilter;
+    }
+
+    @Override
+    public boolean selectsFullPartition()
+    {
+        return clusteringIndexFilter.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns();
+    }
+
+    @Override
+    public DecoratedKey partitionKey()
+    {
+        return partitionKey;
+    }
+
+    @Override
+    public SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits)
+    {
+        return new VirtualTableSinglePartitionReadQuery(metadata(),
+                                                        nowInSec(),
+                                                        columnFilter(),
+                                                        rowFilter(),
+                                                        newLimits,
+                                                        partitionKey(),
+                                                        clusteringIndexFilter);
+    }
+
+    @Override
+    public SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits limits)
+    {
+        return new VirtualTableSinglePartitionReadQuery(metadata(),
+                                                        nowInSec(),
+                                                        columnFilter(),
+                                                        rowFilter(),
+                                                        limits,
+                                                        partitionKey(),
+                                                      lastReturned == null ? clusteringIndexFilter
+                                                              : clusteringIndexFilter.forPaging(metadata().comparator,
+                                                                                                lastReturned,
+                                                                                                false));
+    }
+
+    @Override
+    protected UnfilteredPartitionIterator queryVirtualTable()
+    {
+        VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+        return view.select(partitionKey, clusteringIndexFilter, columnFilter());
+    }
+
+    /**
+     * Groups multiple single partition read queries.
+     */
+    public static class Group extends SinglePartitionReadQuery.Group<VirtualTableSinglePartitionReadQuery>
+    {
+        public static Group create(TableMetadata metadata,
+                                   int nowInSec,
+                                   ColumnFilter columnFilter,
+                                   RowFilter rowFilter,
+                                   DataLimits limits,
+                                   List<DecoratedKey> partitionKeys,
+                                   ClusteringIndexFilter clusteringIndexFilter)
+        {
+            List<VirtualTableSinglePartitionReadQuery> queries = new ArrayList<>(partitionKeys.size());
+            for (DecoratedKey partitionKey : partitionKeys)
+            {
+                queries.add(VirtualTableSinglePartitionReadQuery.create(metadata,
+                                                                        nowInSec,
+                                                                        columnFilter,
+                                                                        rowFilter,
+                                                                        limits,
+                                                                        partitionKey,
+                                                                        clusteringIndexFilter));
+            }
+
+            return new Group(queries, limits);
+        }
+
+        public Group(List<VirtualTableSinglePartitionReadQuery> queries, DataLimits limits)
+        {
+            super(queries, limits);
+        }
+
+        public static Group one(VirtualTableSinglePartitionReadQuery query)
+        {
+            return new Group(Collections.singletonList(query), query.limits());
+        }
+
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+        {
+            if (queries.size() == 1)
+                return queries.get(0).execute(consistency, clientState, queryStartNanoTime);
+
+            return PartitionIterators.concat(queries.stream()
+                                                    .map(q -> q.execute(consistency, clientState, queryStartNanoTime))
+                                                    .collect(Collectors.toList()));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 1d7d1c8..20a1656 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -65,6 +65,8 @@ import org.apache.cassandra.schema.TableMetadata;
  */
 public class ColumnFilter
 {
+    public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);
+
     public static final Serializer serializer = new Serializer();
 
     // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 70759cf..9064b0f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.rows.*;
 
 public abstract class PartitionIterators
@@ -32,15 +32,15 @@ public abstract class PartitionIterators
     private PartitionIterators() {}
 
     @SuppressWarnings("resource") // The created resources are returned right away
-    public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand command)
+    public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadQuery query)
     {
         // If the query has no results, we'll get an empty iterator, but we still
         // want a RowIterator out of this method, so we return an empty one.
         RowIterator toReturn = iter.hasNext()
                              ? iter.next()
-                             : EmptyIterators.row(command.metadata(),
-                                                  command.partitionKey(),
-                                                  command.clusteringIndexFilter().isReversed());
+                             : EmptyIterators.row(query.metadata(),
+                                                  query.partitionKey(),
+                                                  query.clusteringIndexFilter().isReversed());
 
         // Note that in general, we should wrap the result so that it's close method actually
         // close the whole PartitionIterator.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index a549458..2dc566a 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -427,7 +428,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     public void validateIndexedColumns()
     {
-        Keyspace.openAndGetStore(metadata()).indexManager.validate(this);
+        IndexRegistry.obtain(metadata()).validate(this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
new file mode 100644
index 0000000..a776d01
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on demand.
+ */
+public abstract class AbstractVirtualTable implements VirtualTable
+{
+    private final TableMetadata metadata;
+
+    protected AbstractVirtualTable(TableMetadata metadata)
+    {
+        if (!metadata.isVirtual())
+            throw new IllegalArgumentException();
+
+        this.metadata = metadata;
+    }
+
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
+    /**
+     * Provide a {@link DataSet} that is contains all of the virtual table's data.
+     */
+    public abstract DataSet data();
+
+    /**
+     * Provide a {@link DataSet} that is potentially restricted to the provided partition - but is allowed to contain
+     * other partitions.
+     */
+    public DataSet data(DecoratedKey partitionKey)
+    {
+        return data();
+    }
+
+    @Override
+    public final UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter)
+    {
+        Partition partition = data(partitionKey).getPartition(partitionKey);
+
+        if (null == partition)
+            return EmptyIterators.unfilteredPartition(metadata);
+
+        long now = System.currentTimeMillis();
+        UnfilteredRowIterator rowIterator = partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now);
+        return new SingletonUnfilteredPartitionIterator(rowIterator);
+    }
+
+    @Override
+    public final UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter)
+    {
+        DataSet data = data();
+
+        if (data.isEmpty())
+            return EmptyIterators.unfilteredPartition(metadata);
+
+        Iterator<Partition> iterator = data.getPartitions(dataRange);
+
+        long now = System.currentTimeMillis();
+
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            @Override
+            public UnfilteredRowIterator next()
+            {
+                Partition partition = iterator.next();
+                return partition.toRowIterator(metadata, dataRange.clusteringIndexFilter(partition.key()), columnFilter, now);
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
+        };
+    }
+
+    @Override
+    public void apply(PartitionUpdate update)
+    {
+        throw new InvalidRequestException("Modification is not supported by table " + metadata);
+    }
+
+    public interface DataSet
+    {
+        boolean isEmpty();
+        Partition getPartition(DecoratedKey partitionKey);
+        Iterator<Partition> getPartitions(DataRange range);
+    }
+
+    public interface Partition
+    {
+        DecoratedKey key();
+        UnfilteredRowIterator toRowIterator(TableMetadata metadata, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, long now);
+    }
+
+    /**
+     * An abstract, map-backed DataSet implementation. Can be backed by any {@link NavigableMap}, then either maintained
+     * persistently, or built on demand and thrown away after use, depending on the implementing class.
+     */
+    public static abstract class AbstractDataSet implements DataSet
+    {
+        protected final NavigableMap<DecoratedKey, Partition> partitions;
+
+        protected AbstractDataSet(NavigableMap<DecoratedKey, Partition> partitions)
+        {
+            this.partitions = partitions;
+        }
+
+        public boolean isEmpty()
+        {
+            return partitions.isEmpty();
+        }
+
+        public Partition getPartition(DecoratedKey key)
+        {
+            return partitions.get(key);
+        }
+
+        public Iterator<Partition> getPartitions(DataRange dataRange)
+        {
+            AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
+            PartitionPosition startKey = keyRange.left;
+            PartitionPosition endKey = keyRange.right;
+
+            NavigableMap<DecoratedKey, Partition> selection = partitions;
+
+            if (startKey.isMinimum() && endKey.isMinimum())
+                return selection.values().iterator();
+
+            if (startKey.isMinimum() && endKey instanceof DecoratedKey)
+                return selection.headMap((DecoratedKey) endKey, keyRange.isEndInclusive()).values().iterator();
+
+            if (startKey instanceof DecoratedKey && endKey instanceof DecoratedKey)
+            {
+                return selection.subMap((DecoratedKey) startKey, keyRange.isStartInclusive(), (DecoratedKey) endKey, keyRange.isEndInclusive())
+                                .values()
+                                .iterator();
+            }
+
+            if (startKey instanceof DecoratedKey)
+                selection = selection.tailMap((DecoratedKey) startKey, keyRange.isStartInclusive());
+
+            if (endKey instanceof DecoratedKey)
+                selection = selection.headMap((DecoratedKey) endKey, keyRange.isEndInclusive());
+
+            // If we have reach this point it means that one of the PartitionPosition is a KeyBound and we have
+            // to use filtering for eliminating the unwanted partitions.
+            Iterator<Partition> iterator = selection.values().iterator();
+
+            return new AbstractIterator<Partition>()
+            {
+                private boolean encounteredPartitionsWithinRange;
+
+                @Override
+                protected Partition computeNext()
+                {
+                    while (iterator.hasNext())
+                    {
+                        Partition partition = iterator.next();
+                        if (dataRange.contains(partition.key()))
+                        {
+                            encounteredPartitionsWithinRange = true;
+                            return partition;
+                        }
+
+                        // we encountered some partitions within the range, but the last one is outside of the range: we are done
+                        if (encounteredPartitionsWithinRange)
+                            return endOfData();
+                    }
+
+                    return endOfData();
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
new file mode 100644
index 0000000..2af3b6a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A DataSet implementation that is filled on demand and has an easy to use API for adding rows.
+ */
+public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet
+{
+    private final TableMetadata metadata;
+
+    private Row currentRow;
+
+    public SimpleDataSet(TableMetadata metadata)
+    {
+        super(new TreeMap<>(DecoratedKey.comparator));
+        this.metadata = metadata;
+    }
+
+    public SimpleDataSet row(Object... primaryKeyValues)
+    {
+        if (Iterables.size(metadata.primaryKeyColumns()) != primaryKeyValues.length)
+            throw new IllegalArgumentException();
+
+        Object[] partitionKeyValues = new Object[metadata.partitionKeyColumns().size()];
+        Object[]   clusteringValues = new Object[metadata.clusteringColumns().size()];
+
+        System.arraycopy(primaryKeyValues, 0, partitionKeyValues, 0, partitionKeyValues.length);
+        System.arraycopy(primaryKeyValues, partitionKeyValues.length, clusteringValues, 0, clusteringValues.length);
+
+        DecoratedKey partitionKey = makeDecoratedKey(partitionKeyValues);
+        Clustering clustering = makeClustering(clusteringValues);
+
+        currentRow = new Row(metadata, clustering);
+        SimplePartition partition = (SimplePartition) partitions.computeIfAbsent(partitionKey, pk -> new SimplePartition(metadata, pk));
+        partition.add(currentRow);
+
+        return this;
+    }
+
+    public SimpleDataSet column(String columnName, Object value)
+    {
+        if (null == currentRow)
+            throw new IllegalStateException();
+        currentRow.add(columnName, value);
+        return this;
+    }
+
+    private DecoratedKey makeDecoratedKey(Object... partitionKeyValues)
+    {
+        ByteBuffer partitionKey = partitionKeyValues.length == 1
+                                ? decompose(metadata.partitionKeyType, partitionKeyValues[0])
+                                : ((CompositeType) metadata.partitionKeyType).decompose(partitionKeyValues);
+        return metadata.partitioner.decorateKey(partitionKey);
+    }
+
+    private Clustering makeClustering(Object... clusteringValues)
+    {
+        if (clusteringValues.length == 0)
+            return Clustering.EMPTY;
+
+        ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+        for (int i = 0; i < clusteringValues.length; i++)
+            clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+        return Clustering.make(clusteringByteBuffers);
+    }
+
+    private static final class SimplePartition implements AbstractVirtualTable.Partition
+    {
+        private final DecoratedKey key;
+        private final NavigableMap<Clustering, Row> rows;
+
+        private SimplePartition(TableMetadata metadata, DecoratedKey key)
+        {
+            this.key = key;
+            this.rows = new TreeMap<>(metadata.comparator);
+        }
+
+        private void add(Row row)
+        {
+            rows.put(row.clustering, row);
+        }
+
+        public DecoratedKey key()
+        {
+            return key;
+        }
+
+        public UnfilteredRowIterator toRowIterator(TableMetadata metadata,
+                                                   ClusteringIndexFilter clusteringIndexFilter,
+                                                   ColumnFilter columnFilter,
+                                                   long now)
+        {
+            Iterator<Row> iterator = (clusteringIndexFilter.isReversed() ? rows.descendingMap() : rows).values().iterator();
+
+            return new AbstractUnfilteredRowIterator(metadata,
+                                                     key,
+                                                     DeletionTime.LIVE,
+                                                     columnFilter.queriedColumns(),
+                                                     Rows.EMPTY_STATIC_ROW,
+                                                     false,
+                                                     EncodingStats.NO_STATS)
+            {
+                protected Unfiltered computeNext()
+                {
+                    while (iterator.hasNext())
+                    {
+                        Row row = iterator.next();
+                        if (clusteringIndexFilter.selects(row.clustering))
+                            return row.toTableRow(columns, now);
+                    }
+                    return endOfData();
+                }
+            };
+        }
+    }
+
+    private static class Row
+    {
+        private final TableMetadata metadata;
+        private final Clustering clustering;
+
+        private final Map<ColumnMetadata, Object> values = new HashMap<>();
+
+        private Row(TableMetadata metadata, Clustering clustering)
+        {
+            this.metadata = metadata;
+            this.clustering = clustering;
+        }
+
+        private void add(String columnName, Object value)
+        {
+            ColumnMetadata column = metadata.getColumn(ByteBufferUtil.bytes(columnName));
+            if (null == column || !column.isRegular())
+                throw new IllegalArgumentException();
+            values.put(column, value);
+        }
+
+        private org.apache.cassandra.db.rows.Row toTableRow(RegularAndStaticColumns columns, long now)
+        {
+            org.apache.cassandra.db.rows.Row.Builder builder = BTreeRow.unsortedBuilder((int) TimeUnit.MILLISECONDS.toSeconds(now));
+            builder.newRow(clustering);
+
+            columns.forEach(c ->
+            {
+                Object value = values.get(c);
+                if (null != value)
+                    builder.addCell(BufferCell.live(c, now, decompose(c.type, value)));
+            });
+
+            return builder.build();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
new file mode 100644
index 0000000..8d6f59b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import com.google.common.collect.ImmutableList;
+
+public final class SystemViewsKeyspace extends VirtualKeyspace
+{
+    private static final String NAME = "system_views";
+
+    public static SystemViewsKeyspace instance = new SystemViewsKeyspace();
+
+    private SystemViewsKeyspace()
+    {
+        super(NAME, ImmutableList.of());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/4] cassandra git commit: Implement virtual keyspace interface

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
new file mode 100644
index 0000000..6750215
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collection;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Tables;
+
+public class VirtualKeyspace
+{
+    private final String name;
+    private final KeyspaceMetadata metadata;
+
+    private final ImmutableCollection<VirtualTable> tables;
+
+    public VirtualKeyspace(String name, Collection<VirtualTable> tables)
+    {
+        this.name = name;
+        this.tables = ImmutableList.copyOf(tables);
+
+        metadata = KeyspaceMetadata.virtual(name, Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+    }
+
+    public String name()
+    {
+        return name;
+    }
+
+    public KeyspaceMetadata metadata()
+    {
+        return metadata;
+    }
+
+    public ImmutableCollection<VirtualTable> tables()
+    {
+        return tables;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
new file mode 100644
index 0000000..5e0f90c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+
+public final class VirtualKeyspaceRegistry
+{
+    public static final VirtualKeyspaceRegistry instance = new VirtualKeyspaceRegistry();
+
+    private final Map<String, VirtualKeyspace> virtualKeyspaces = new ConcurrentHashMap<>();
+    private final Map<TableId, VirtualTable> virtualTables = new ConcurrentHashMap<>();
+
+    private VirtualKeyspaceRegistry()
+    {
+    }
+
+    public void register(VirtualKeyspace keyspace)
+    {
+        virtualKeyspaces.put(keyspace.name(), keyspace);
+        keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t));
+    }
+
+    @Nullable
+    public VirtualKeyspace getKeyspaceNullable(String name)
+    {
+        return virtualKeyspaces.get(name);
+    }
+
+    @Nullable
+    public VirtualTable getTableNullable(TableId id)
+    {
+        return virtualTables.get(id);
+    }
+
+    @Nullable
+    public KeyspaceMetadata getKeyspaceMetadataNullable(String name)
+    {
+        VirtualKeyspace keyspace = virtualKeyspaces.get(name);
+        return null != keyspace ? keyspace.metadata() : null;
+    }
+
+    @Nullable
+    public TableMetadata getTableMetadataNullable(TableId id)
+    {
+        VirtualTable table = virtualTables.get(id);
+        return null != table ? table.metadata() : null;
+    }
+
+    public Iterable<KeyspaceMetadata> virtualKeyspacesMetadata()
+    {
+        return Iterables.transform(virtualKeyspaces.values(), VirtualKeyspace::metadata);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
new file mode 100644
index 0000000..dc32c8c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collection;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * A specialised IMutation implementation for virtual keyspaces.
+ *
+ * Mainly overrides {@link #apply()} to go straight to {@link VirtualTable#apply(PartitionUpdate)} for every table involved.
+ */
+public final class VirtualMutation implements IMutation
+{
+    private final String keyspaceName;
+    private final DecoratedKey partitionKey;
+    private final ImmutableMap<TableId, PartitionUpdate> modifications;
+
+    public VirtualMutation(PartitionUpdate update)
+    {
+        this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update));
+    }
+
+    public VirtualMutation(String keyspaceName, DecoratedKey partitionKey, ImmutableMap<TableId, PartitionUpdate> modifications)
+    {
+        this.keyspaceName = keyspaceName;
+        this.partitionKey = partitionKey;
+        this.modifications = modifications;
+    }
+
+    @Override
+    public void apply()
+    {
+        modifications.forEach((id, update) -> VirtualKeyspaceRegistry.instance.getTableNullable(id).apply(update));
+    }
+
+    @Override
+    public String getKeyspaceName()
+    {
+        return keyspaceName;
+    }
+
+    @Override
+    public Collection<TableId> getTableIds()
+    {
+        return modifications.keySet();
+    }
+
+    @Override
+    public DecoratedKey key()
+    {
+        return partitionKey;
+    }
+
+    @Override
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getWriteRpcTimeout();
+    }
+
+    @Override
+    public String toString(boolean shallow)
+    {
+        MoreObjects.ToStringHelper helper =
+            MoreObjects.toStringHelper(this)
+                       .add("keyspace", keyspaceName)
+                       .add("partition key", partitionKey);
+
+        if (shallow)
+            helper.add("tables", getTableIds());
+        else
+            helper.add("modifications", getPartitionUpdates());
+
+        return helper.toString();
+    }
+
+    @Override
+    public Collection<PartitionUpdate> getPartitionUpdates()
+    {
+        return modifications.values();
+    }
+
+    @Override
+    public void validateIndexedColumns()
+    {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
new file mode 100644
index 0000000..299cc00
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.schema.TableMetadata.builder;
+
+public final class VirtualSchemaKeyspace extends VirtualKeyspace
+{
+    private static final String NAME = "system_virtual_schema";
+
+    public static final VirtualSchemaKeyspace instance = new VirtualSchemaKeyspace();
+
+    private VirtualSchemaKeyspace()
+    {
+        super(NAME, ImmutableList.of(new VirtualKeyspaces(NAME), new VirtualTables(NAME), new VirtualColumns(NAME)));
+    }
+
+    private static final class VirtualKeyspaces extends AbstractVirtualTable
+    {
+        private static final String KEYSPACE_NAME = "keyspace_name";
+
+        private VirtualKeyspaces(String keyspace)
+        {
+            super(builder(keyspace, "keyspaces")
+                 .comment("virtual keyspace definitions")
+                 .kind(TableMetadata.Kind.VIRTUAL)
+                 .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+                 .build());
+        }
+
+        public DataSet data()
+        {
+            SimpleDataSet result = new SimpleDataSet(metadata());
+            for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+                result.row(keyspace.name);
+            return result;
+        }
+    }
+
+    private static final class VirtualTables extends AbstractVirtualTable
+    {
+        private static final String KEYSPACE_NAME = "keyspace_name";
+        private static final String TABLE_NAME = "table_name";
+        private static final String COMMENT = "comment";
+
+        private VirtualTables(String keyspace)
+        {
+            super(builder(keyspace, "tables")
+                 .comment("virtual table definitions")
+                 .kind(TableMetadata.Kind.VIRTUAL)
+                 .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+                 .addClusteringColumn(TABLE_NAME, UTF8Type.instance)
+                 .addRegularColumn(COMMENT, UTF8Type.instance)
+                 .build());
+        }
+
+        public DataSet data()
+        {
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+            {
+                for (TableMetadata table : keyspace.tables)
+                {
+                    result.row(table.keyspace, table.name)
+                          .column(COMMENT, table.params.comment);
+                }
+            }
+
+            return result;
+        }
+    }
+
+    private static final class VirtualColumns extends AbstractVirtualTable
+    {
+        private static final String KEYSPACE_NAME = "keyspace_name";
+        private static final String TABLE_NAME = "table_name";
+        private static final String COLUMN_NAME = "column_name";
+        private static final String CLUSTERING_ORDER = "clustering_order";
+        private static final String COLUMN_NAME_BYTES = "column_name_bytes";
+        private static final String KIND = "kind";
+        private static final String POSITION = "position";
+        private static final String TYPE = "type";
+
+        private VirtualColumns(String keyspace)
+        {
+            super(builder(keyspace, "columns")
+                 .comment("virtual column definitions")
+                 .kind(TableMetadata.Kind.VIRTUAL)
+                 .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+                 .addClusteringColumn(TABLE_NAME, UTF8Type.instance)
+                 .addClusteringColumn(COLUMN_NAME, UTF8Type.instance)
+                 .addRegularColumn(CLUSTERING_ORDER, UTF8Type.instance)
+                 .addRegularColumn(COLUMN_NAME_BYTES, BytesType.instance)
+                 .addRegularColumn(KIND, UTF8Type.instance)
+                 .addRegularColumn(POSITION, Int32Type.instance)
+                 .addRegularColumn(TYPE, UTF8Type.instance)
+                 .build());
+        }
+
+        public DataSet data()
+        {
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+            {
+                for (TableMetadata table : keyspace.tables)
+                {
+                    for (ColumnMetadata column : table.columns())
+                    {
+                        result.row(column.ksName, column.cfName, column.name.toString())
+                              .column(CLUSTERING_ORDER, column.clusteringOrder().toString().toLowerCase())
+                              .column(COLUMN_NAME_BYTES, column.name.bytes)
+                              .column(KIND, column.kind.toString().toLowerCase())
+                              .column(POSITION, column.position())
+                              .column(TYPE, column.type.asCQL3Type().toString());
+                    }
+                }
+            }
+
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
new file mode 100644
index 0000000..ea196ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * A system view used to expose system information.
+ */
+public interface VirtualTable
+{
+    /**
+     * Returns the view name.
+     *
+     * @return the view name.
+     */
+    default String name()
+    {
+        return metadata().name;
+    }
+
+    /**
+     * Returns the view metadata.
+     *
+     * @return the view metadata.
+     */
+    TableMetadata metadata();
+
+    /**
+     * Applies the specified update.
+     * @param update the update to apply
+     */
+    void apply(PartitionUpdate update);
+
+    /**
+     * Selects the rows from a single partition.
+     *
+     * @param partitionKey the partition key
+     * @param clusteringIndexFilter the clustering columns to selected
+     * @param columnFilter the selected columns
+     * @return the rows corresponding to the requested data.
+     */
+    UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter);
+
+    /**
+     * Selects the rows from a range of partitions.
+     *
+     * @param dataRange the range of data to retrieve
+     * @param columnFilter the selected columns
+     * @return the rows corresponding to the requested data.
+     */
+    UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/IndexRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 9f5ed02..e4c531b 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -21,8 +21,14 @@
 package org.apache.cassandra.index;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
 
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 
 /**
  * The collection of all Index instances for a base table.
@@ -34,9 +40,72 @@ import org.apache.cassandra.schema.IndexMetadata;
  */
 public interface IndexRegistry
 {
+    /**
+     * An empty {@code IndexRegistry}
+     */
+    public static final IndexRegistry EMPTY = new IndexRegistry()
+    {
+        @Override
+        public void unregisterIndex(Index index)
+        {
+        }
+
+        @Override
+        public void registerIndex(Index index)
+        {
+        }
+
+        @Override
+        public Collection<Index> listIndexes()
+        {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Index getIndex(IndexMetadata indexMetadata)
+        {
+            return null;
+        }
+
+        @Override
+        public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
+        {
+            return Optional.empty();
+        }
+
+        @Override
+        public void validate(PartitionUpdate update)
+        {
+        }
+    };
+
     void registerIndex(Index index);
     void unregisterIndex(Index index);
 
     Index getIndex(IndexMetadata indexMetadata);
     Collection<Index> listIndexes();
+
+    Optional<Index> getBestIndexFor(RowFilter.Expression expression);
+
+    /**
+     * Called at write time to ensure that values present in the update
+     * are valid according to the rules of all registered indexes which
+     * will process it. The partition key as well as the clustering and
+     * cell values for each row in the update may be checked by index
+     * implementations
+     *
+     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+     */
+    void validate(PartitionUpdate update);
+
+    /**
+     * Returns the {@code IndexRegistry} associated to the specified table.
+     *
+     * @param table the table metadata
+     * @return the {@code IndexRegistry} associated to the specified table
+     */
+    public static IndexRegistry obtain(TableMetadata table)
+    {
+        return table.isVirtual() ? EMPTY : Keyspace.openAndGetStore(table).indexManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 9a29c02..fb0d629 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -739,6 +739,7 @@ public abstract class CassandraIndex implements Index
 
         TableMetadata.Builder builder =
             TableMetadata.builder(baseCfsMetadata.keyspace, baseCfsMetadata.indexTableName(indexMetadata), baseCfsMetadata.id)
+                         .kind(TableMetadata.Kind.INDEX)
                          // tables for legacy KEYS indexes are non-compound and dense
                          .isDense(indexMetadata.isKeys())
                          .isCompound(!indexMetadata.isKeys())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 80a3869..5a72d2c 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -36,16 +36,23 @@ import static java.lang.String.format;
  */
 public final class KeyspaceMetadata
 {
+    public enum Kind
+    {
+        REGULAR, VIRTUAL
+    }
+
     public final String name;
+    public final Kind kind;
     public final KeyspaceParams params;
     public final Tables tables;
     public final Views views;
     public final Types types;
     public final Functions functions;
 
-    private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
+    private KeyspaceMetadata(String name, Kind kind, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
     {
         this.name = name;
+        this.kind = kind;
         this.params = params;
         this.tables = tables;
         this.views = views;
@@ -55,42 +62,52 @@ public final class KeyspaceMetadata
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params)
     {
-        return new KeyspaceMetadata(name, params, Tables.none(), Views.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, Kind.REGULAR, params, Tables.none(), Views.none(), Types.none(), Functions.none());
     }
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables)
     {
-        return new KeyspaceMetadata(name, params, tables, Views.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, Views.none(), Types.none(), Functions.none());
     }
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
     {
-        return new KeyspaceMetadata(name, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, views, types, functions);
+    }
+
+    public static KeyspaceMetadata virtual(String name, Tables tables)
+    {
+        return new KeyspaceMetadata(name, Kind.VIRTUAL, KeyspaceParams.local(), tables, Views.none(), Types.none(), Functions.none());
     }
 
     public KeyspaceMetadata withSwapped(KeyspaceParams params)
     {
-        return new KeyspaceMetadata(name, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(Tables regular)
     {
-        return new KeyspaceMetadata(name, params, regular, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, regular, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(Views views)
     {
-        return new KeyspaceMetadata(name, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(Types types)
     {
-        return new KeyspaceMetadata(name, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(Functions functions)
     {
-        return new KeyspaceMetadata(name, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
+    }
+
+    public boolean isVirtual()
+    {
+        return kind == Kind.VIRTUAL;
     }
 
     public Iterable<TableMetadata> tablesAndViews()
@@ -129,7 +146,7 @@ public final class KeyspaceMetadata
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, params, tables, views, functions, types);
+        return Objects.hashCode(name, kind, params, tables, views, functions, types);
     }
 
     @Override
@@ -144,6 +161,7 @@ public final class KeyspaceMetadata
         KeyspaceMetadata other = (KeyspaceMetadata) o;
 
         return name.equals(other.name)
+            && kind == other.kind
             && params.equals(other.params)
             && tables.equals(other.tables)
             && views.equals(other.views)
@@ -156,6 +174,7 @@ public final class KeyspaceMetadata
     {
         return MoreObjects.toStringHelper(this)
                           .add("name", name)
+                          .add("kind", kind)
                           .add("params", params)
                           .add("tables", tables)
                           .add("views", views)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 594b2ab..09ec62a 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.MapDifference;
@@ -28,15 +29,12 @@ import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.KeyspaceNotDefinedException;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnknownTableException;
@@ -312,7 +310,8 @@ public final class Schema
     public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
     {
         assert keyspaceName != null;
-        return keyspaces.getNullable(keyspaceName);
+        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+        return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName);
     }
 
     private Set<String> getNonSystemKeyspacesSet()
@@ -426,15 +425,17 @@ public final class Schema
         assert keyspace != null;
         assert table != null;
 
-        KeyspaceMetadata ksm = keyspaces.getNullable(keyspace);
+        KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace);
         return ksm == null
              ? null
              : ksm.getTableOrViewNullable(table);
     }
 
+    @Nullable
     public TableMetadata getTableMetadata(TableId id)
     {
-        return keyspaces.getTableOrViewNullable(id);
+        TableMetadata table = keyspaces.getTableOrViewNullable(id);
+        return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
     }
 
     public TableMetadata validateTable(String keyspaceName, String tableName)
@@ -442,7 +443,7 @@ public final class Schema
         if (tableName.isEmpty())
             throw new InvalidRequestException("non-empty table is required");
 
-        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+        KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName);
         if (keyspace == null)
             throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 638e912..4945fc2 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -1141,7 +1141,7 @@ public final class SchemaKeyspace
 
         TableMetadata metadata =
             TableMetadata.builder(keyspaceName, viewName, TableId.fromUUID(row.getUUID("id")))
-                         .isView(true)
+                         .kind(TableMetadata.Kind.VIEW)
                          .addColumns(columns)
                          .droppedColumns(fetchDroppedColumns(keyspaceName, viewName))
                          .params(createTableParamsFromRow(row))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 4634438..47e5b47 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.Objects;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.*;
 
@@ -82,15 +84,21 @@ public final class TableMetadata
         }
     }
 
+    public enum Kind
+    {
+        REGULAR, INDEX, VIEW, VIRTUAL
+    }
+
     public final String keyspace;
     public final String name;
     public final TableId id;
 
     public final IPartitioner partitioner;
+    public final Kind kind;
     public final TableParams params;
     public final ImmutableSet<Flag> flags;
 
-    private final boolean isView;
+    @Nullable
     private final String indexName; // derived from table name
 
     /*
@@ -139,12 +147,10 @@ public final class TableMetadata
         id = builder.id;
 
         partitioner = builder.partitioner;
+        kind = builder.kind;
         params = builder.params.build();
-        isView = builder.isView;
 
-        indexName = name.contains(".")
-                  ? name.substring(name.indexOf('.') + 1)
-                  : null;
+        indexName = kind == Kind.INDEX ? name.substring(name.indexOf('.') + 1) : null;
 
         droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
         Collections.sort(builder.partitionKeyColumns);
@@ -184,23 +190,28 @@ public final class TableMetadata
     {
         return builder(keyspace, name, id)
                .partitioner(partitioner)
+               .kind(kind)
                .params(params)
                .flags(flags)
-               .isView(isView)
                .addColumns(columns())
                .droppedColumns(droppedColumns)
                .indexes(indexes)
                .triggers(triggers);
     }
 
+    public boolean isIndex()
+    {
+        return kind == Kind.INDEX;
+    }
+
     public boolean isView()
     {
-        return isView;
+        return kind == Kind.VIEW;
     }
 
-    public boolean isIndex()
+    public boolean isVirtual()
     {
-        return indexName != null;
+        return kind == Kind.VIRTUAL;
     }
 
     public Optional<String> indexName()
@@ -534,7 +545,7 @@ public final class TableMetadata
 
     private void except(String format, Object... args)
     {
-        throw new ConfigurationException(keyspace + "." + name + ": " +format(format, args));
+        throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args));
     }
 
     @Override
@@ -552,9 +563,9 @@ public final class TableMetadata
             && name.equals(tm.name)
             && id.equals(tm.id)
             && partitioner.equals(tm.partitioner)
+            && kind == tm.kind
             && params.equals(tm.params)
             && flags.equals(tm.flags)
-            && isView == tm.isView
             && columns.equals(tm.columns)
             && droppedColumns.equals(tm.droppedColumns)
             && indexes.equals(tm.indexes)
@@ -564,7 +575,7 @@ public final class TableMetadata
     @Override
     public int hashCode()
     {
-        return Objects.hash(keyspace, name, id, partitioner, params, flags, isView, columns, droppedColumns, indexes, triggers);
+        return Objects.hash(keyspace, name, id, partitioner, kind, params, flags, columns, droppedColumns, indexes, triggers);
     }
 
     @Override
@@ -580,9 +591,9 @@ public final class TableMetadata
                           .add("table", name)
                           .add("id", id)
                           .add("partitioner", partitioner)
+                          .add("kind", kind)
                           .add("params", params)
                           .add("flags", flags)
-                          .add("isView", isView)
                           .add("columns", columns())
                           .add("droppedColumns", droppedColumns.values())
                           .add("indexes", indexes)
@@ -598,6 +609,7 @@ public final class TableMetadata
         private TableId id;
 
         private IPartitioner partitioner;
+        private Kind kind = Kind.REGULAR;
         private TableParams.Builder params = TableParams.builder();
 
         // Setting compound as default as "normal" CQL tables are compound and that's what we want by default
@@ -611,8 +623,6 @@ public final class TableMetadata
         private final List<ColumnMetadata> clusteringColumns = new ArrayList<>();
         private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>();
 
-        private boolean isView;
-
         private Builder(String keyspace, String name, TableId id)
         {
             this.keyspace = keyspace;
@@ -649,6 +659,12 @@ public final class TableMetadata
             return this;
         }
 
+        public Builder kind(Kind val)
+        {
+            kind = val;
+            return this;
+        }
+
         public Builder params(TableParams val)
         {
             params = val.unbuild();
@@ -733,12 +749,6 @@ public final class TableMetadata
             return this;
         }
 
-        public Builder isView(boolean val)
-        {
-            isView = val;
-            return this;
-        }
-
         public Builder flags(Set<Flag> val)
         {
             flags = val;
@@ -979,6 +989,6 @@ public final class TableMetadata
      */
     public boolean enforceStrictLiveness()
     {
-        return isView && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
+        return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CASRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 1db100d..88fb9bd 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -30,7 +30,7 @@ public interface CASRequest
     /**
      * The command to use to fetch the value to compare for the CAS.
      */
-    public SinglePartitionReadCommand readCommand(int nowInSec);
+    public SinglePartitionReadQuery readCommand(int nowInSec);
 
     /**
      * Returns whether the provided CF, that represents the values fetched using the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 6e0b92b..815e673 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,6 +46,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.StartupClusterConnectivityChecker;
 import org.apache.cassandra.schema.TableMetadata;
@@ -249,6 +252,8 @@ public class CassandraDaemon
             throw e;
         }
 
+        VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
+        VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
 
         // clean up debris in the rest of the keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index c854737..234ac4f 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.*;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -65,8 +66,12 @@ public class ClientState
         for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.LEGACY_PEERS, SystemKeyspace.PEERS_V2))
             READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf));
 
+        // make all schema tables readable by default (required by the drivers)
         SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
 
+        // make all virtual schema tables readable by default as well
+        VirtualSchemaKeyspace.instance.tables().forEach(t -> READABLE_SYSTEM_RESOURCES.add(t.metadata().resource));
+
         // neither clients nor tools need authentication/authorization
         if (DatabaseDescriptor.isDaemonInitialized())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 37bfd17..7e9b0f9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -259,7 +259,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 // read the current values and check they validate the conditions
                 Tracing.trace("Reading existing values for CAS precondition");
-                SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+                SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds());
                 ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
 
                 FilteredPartition current;
@@ -1633,7 +1633,7 @@ public class StorageProxy implements StorageProxyMBean
     public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
+        if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries))
         {
             readMetrics.unavailables.mark();
             readMetricsMap.get(consistencyLevel).unavailables.mark();
@@ -1649,11 +1649,11 @@ public class StorageProxy implements StorageProxyMBean
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
         assert state != null;
-        if (group.commands.size() > 1)
+        if (group.queries.size() > 1)
             throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
 
         long start = System.nanoTime();
-        SinglePartitionReadCommand command = group.commands.get(0);
+        SinglePartitionReadCommand command = group.queries.get(0);
         TableMetadata metadata = command.metadata();
         DecoratedKey key = command.partitionKey();
 
@@ -1685,7 +1685,7 @@ public class StorageProxy implements StorageProxyMBean
                 throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint);
             }
 
-            result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime);
+            result = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime);
         }
         catch (UnavailableException e)
         {
@@ -1727,13 +1727,13 @@ public class StorageProxy implements StorageProxyMBean
         long start = System.nanoTime();
         try
         {
-            PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
+            PartitionIterator result = fetchRows(group.queries, consistencyLevel, queryStartNanoTime);
             // Note that the only difference between the command in a group must be the partition key on which
             // they applied.
-            boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness();
+            boolean enforceStrictLiveness = group.queries.get(0).metadata().enforceStrictLiveness();
             // If we have more than one command, then despite each read command honoring the limit, the total result
             // might not honor it and so we should enforce it
-            if (group.commands.size() > 1)
+            if (group.queries.size() > 1)
                 result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness);
             return result;
         }
@@ -1761,7 +1761,7 @@ public class StorageProxy implements StorageProxyMBean
             readMetrics.addNano(latency);
             readMetricsMap.get(consistencyLevel).addNano(latency);
             // TODO avoid giving every command the same latency number.  Can fix this in CASSADRA-5329
-            for (ReadCommand command : group.commands)
+            for (ReadCommand command : group.queries)
                 Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8570f10..4214644 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -69,6 +69,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Verifier;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token.TokenFactory;
@@ -3456,12 +3457,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     }
 
-    private Keyspace getValidKeyspace(String keyspaceName) throws IOException
+    private void verifyKeyspaceIsValid(String keyspaceName)
     {
+        if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspaceName))
+            throw new IllegalArgumentException("Cannot perform any operations against virtual keyspace " + keyspaceName);
+
         if (!Schema.instance.getKeyspaces().contains(keyspaceName))
-        {
-            throw new IOException("Keyspace " + keyspaceName + " does not exist");
-        }
+            throw new IllegalArgumentException("Keyspace " + keyspaceName + " does not exist");
+    }
+
+    private Keyspace getValidKeyspace(String keyspaceName)
+    {
+        verifyKeyspaceIsValid(keyspaceName);
         return Keyspace.open(keyspaceName);
     }
 
@@ -4787,6 +4794,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void truncate(String keyspace, String table) throws TimeoutException, IOException
     {
+        verifyKeyspaceIsValid(keyspace);
+
         try
         {
             StorageProxy.truncateBlocking(keyspace, table);
@@ -5249,6 +5258,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         if (!isInitialized())
             throw new RuntimeException("Not yet initialized, can't load new sstables");
+        verifyKeyspaceIsValid(ksName);
         ColumnFamilyStore.loadNewSSTables(ksName, cfName);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 8ebbdf7..da64a0c 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -26,9 +26,9 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.ProtocolVersion;
 
-abstract class AbstractQueryPager implements QueryPager
+abstract class AbstractQueryPager<T extends ReadQuery> implements QueryPager
 {
-    protected final ReadCommand command;
+    protected final T query;
     protected final DataLimits limits;
     protected final ProtocolVersion protocolVersion;
     private final boolean enforceStrictLiveness;
@@ -43,12 +43,12 @@ abstract class AbstractQueryPager implements QueryPager
 
     private boolean exhausted;
 
-    protected AbstractQueryPager(ReadCommand command, ProtocolVersion protocolVersion)
+    protected AbstractQueryPager(T query, ProtocolVersion protocolVersion)
     {
-        this.command = command;
+        this.query = query;
         this.protocolVersion = protocolVersion;
-        this.limits = command.limits();
-        this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+        this.limits = query.limits();
+        this.enforceStrictLiveness = query.metadata().enforceStrictLiveness();
 
         this.remaining = limits.count();
         this.remainingInPartition = limits.perPartitionCount();
@@ -56,7 +56,7 @@ abstract class AbstractQueryPager implements QueryPager
 
     public ReadExecutionController executionController()
     {
-        return command.executionController();
+        return query.executionController();
     }
 
     public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime)
@@ -65,8 +65,8 @@ abstract class AbstractQueryPager implements QueryPager
             return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
-        Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
-        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
+        Pager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec());
+        return Transformation.apply(nextPageReadQuery(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
     }
 
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
@@ -75,8 +75,8 @@ abstract class AbstractQueryPager implements QueryPager
             return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
-        RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
-        return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager);
+        RowPager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec());
+        return Transformation.apply(nextPageReadQuery(pageSize).executeInternal(executionController), pager);
     }
 
     public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata metadata, int pageSize, ReadExecutionController executionController)
@@ -85,9 +85,9 @@ abstract class AbstractQueryPager implements QueryPager
             return EmptyIterators.unfilteredPartition(metadata);
 
         pageSize = Math.min(pageSize, remaining);
-        UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
+        UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), query.nowInSec());
 
-        return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(executionController), pager);
+        return Transformation.apply(nextPageReadQuery(pageSize).executeLocally(executionController), pager);
     }
 
     private class UnfilteredPager extends Pager<Unfiltered>
@@ -128,7 +128,7 @@ abstract class AbstractQueryPager implements QueryPager
 
         private Pager(DataLimits pageLimits, int nowInSec)
         {
-            this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition(), enforceStrictLiveness);
+            this.counter = pageLimits.newCounter(nowInSec, true, query.selectsFullPartition(), enforceStrictLiveness);
             this.pageLimits = pageLimits;
         }
 
@@ -228,7 +228,7 @@ abstract class AbstractQueryPager implements QueryPager
         return remainingInPartition;
     }
 
-    protected abstract ReadCommand nextPageReadCommand(int pageSize);
+    protected abstract T nextPageReadQuery(int pageSize);
     protected abstract void recordLast(DecoratedKey key, Row row);
     protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 9dae11c..ca16967 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -31,20 +31,20 @@ import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.ClientState;
 
 /**
- * Pager over a list of ReadCommand.
+ * Pager over a list of SinglePartitionReadQuery.
  *
- * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
- * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
- * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
- * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * Note that this is not easy to make efficient. Indeed, we need to page the first query fully before
+ * returning results from the next one, but if the result returned by each query is small (compared to pageSize),
+ * paging the queries one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each query raised pageSize results, we'll end up with queries.size() * pageSize results in memory, which
  * defeats the purpose of paging.
  *
- * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * For now, we keep it simple (somewhat) and just do one query at a time. Provided that we make sure to not
  * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
- * cfs meanPartitionSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * cfs meanPartitionSize to decide if parallelizing some of the query might be worth it while being confident we don't
  * blow out memory.
  */
-public class MultiPartitionPager implements QueryPager
+public class MultiPartitionPager<T extends SinglePartitionReadQuery> implements QueryPager
 {
     private final SinglePartitionPager[] pagers;
     private final DataLimits limit;
@@ -54,33 +54,33 @@ public class MultiPartitionPager implements QueryPager
     private int remaining;
     private int current;
 
-    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, ProtocolVersion protocolVersion)
+    public MultiPartitionPager(SinglePartitionReadQuery.Group<T> group, PagingState state, ProtocolVersion protocolVersion)
     {
         this.limit = group.limits();
         this.nowInSec = group.nowInSec();
 
         int i = 0;
-        // If it's not the beginning (state != null), we need to find where we were and skip previous commands
+        // If it's not the beginning (state != null), we need to find where we were and skip previous queries
         // since they are done.
         if (state != null)
-            for (; i < group.commands.size(); i++)
-                if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey))
+            for (; i < group.queries.size(); i++)
+                if (group.queries.get(i).partitionKey().getKey().equals(state.partitionKey))
                     break;
 
-        if (i >= group.commands.size())
+        if (i >= group.queries.size())
         {
             pagers = null;
             return;
         }
 
-        pagers = new SinglePartitionPager[group.commands.size() - i];
+        pagers = new SinglePartitionPager[group.queries.size() - i];
         // 'i' is on the first non exhausted pager for the previous page (or the first one)
-        SinglePartitionReadCommand command = group.commands.get(i);
-        pagers[0] = command.getPager(state, protocolVersion);
+        T query = group.queries.get(i);
+        pagers[0] = query.getPager(state, protocolVersion);
 
         // Following ones haven't been started yet
-        for (int j = i + 1; j < group.commands.size(); j++)
-            pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion);
+        for (int j = i + 1; j < group.queries.size(); j++)
+            pagers[j - i] = group.queries.get(j).getPager(null, protocolVersion);
 
         remaining = state == null ? limit.count() : state.remaining;
     }
@@ -103,11 +103,11 @@ public class MultiPartitionPager implements QueryPager
         SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length);
         newPagers[current] = newPagers[current].withUpdatedLimit(newLimits);
 
-        return new MultiPartitionPager(newPagers,
-                                       newLimits,
-                                       nowInSec,
-                                       remaining,
-                                       current);
+        return new MultiPartitionPager<T>(newPagers,
+                                          newLimits,
+                                          nowInSec,
+                                          remaining,
+                                          current);
     }
 
     public PagingState state()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index ba6862d..cebf3c6 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -21,37 +21,36 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.transport.ProtocolVersion;
 
 /**
- * Pages a PartitionRangeReadCommand.
+ * Pages a PartitionRangeReadQuery.
  */
-public class PartitionRangeQueryPager extends AbstractQueryPager
+public class PartitionRangeQueryPager extends AbstractQueryPager<PartitionRangeReadQuery>
 {
     private volatile DecoratedKey lastReturnedKey;
     private volatile PagingState.RowMark lastReturnedRow;
 
-    public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, ProtocolVersion protocolVersion)
+    public PartitionRangeQueryPager(PartitionRangeReadQuery query, PagingState state, ProtocolVersion protocolVersion)
     {
-        super(command, protocolVersion);
+        super(query, protocolVersion);
 
         if (state != null)
         {
-            lastReturnedKey = command.metadata().partitioner.decorateKey(state.partitionKey);
+            lastReturnedKey = query.metadata().partitioner.decorateKey(state.partitionKey);
             lastReturnedRow = state.rowMark;
             restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
         }
     }
 
-    public PartitionRangeQueryPager(ReadCommand command,
+    public PartitionRangeQueryPager(PartitionRangeReadQuery query,
                                     ProtocolVersion protocolVersion,
                                     DecoratedKey lastReturnedKey,
                                     PagingState.RowMark lastReturnedRow,
                                     int remaining,
                                     int remainingInPartition)
     {
-        super(command, protocolVersion);
+        super(query, protocolVersion);
         this.lastReturnedKey = lastReturnedKey;
         this.lastReturnedRow = lastReturnedRow;
         restoreState(lastReturnedKey, remaining, remainingInPartition);
@@ -59,7 +58,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
 
     public PartitionRangeQueryPager withUpdatedLimit(DataLimits newLimits)
     {
-        return new PartitionRangeQueryPager(command.withUpdatedLimit(newLimits),
+        return new PartitionRangeQueryPager(query.withUpdatedLimit(newLimits),
                                             protocolVersion,
                                             lastReturnedKey,
                                             lastReturnedRow,
@@ -74,16 +73,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
              : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition());
     }
 
-    protected ReadCommand nextPageReadCommand(int pageSize)
-    throws RequestExecutionException
+    @Override
+    protected PartitionRangeReadQuery nextPageReadQuery(int pageSize)
     {
         DataLimits limits;
-        DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
+        DataRange fullRange = query.dataRange();
         DataRange pageRange;
         if (lastReturnedKey == null)
         {
             pageRange = fullRange;
-            limits = command.limits().forPaging(pageSize);
+            limits = query.limits().forPaging(pageSize);
         }
         else
         {
@@ -92,17 +91,17 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
             AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
             if (includeLastKey)
             {
-                pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false);
-                limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
+                pageRange = fullRange.forPaging(bounds, query.metadata().comparator, lastReturnedRow.clustering(query.metadata()), false);
+                limits = query.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
             }
             else
             {
                 pageRange = fullRange.forSubRange(bounds);
-                limits = command.limits().forPaging(pageSize);
+                limits = query.limits().forPaging(pageSize);
             }
         }
 
-        return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange);
+        return query.withUpdatedLimitsAndDataRange(limits, pageRange);
     }
 
     protected void recordLast(DecoratedKey key, Row last)
@@ -111,7 +110,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
         {
             lastReturnedKey = key;
             if (last.clustering() != Clustering.STATIC_CLUSTERING)
-                lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+                lastReturnedRow = PagingState.RowMark.create(query.metadata(), last, protocolVersion);
         }
     }
 
@@ -123,18 +122,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
 
     private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey)
     {
-        AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
+        AbstractBounds<PartitionPosition> bounds = query.dataRange().keyRange();
         if (bounds instanceof Range || bounds instanceof Bounds)
         {
             return includeLastKey
-                 ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
-                 : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
-        }
-        else
-        {
-            return includeLastKey
-                 ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right)
-                 : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
+                 ? new Bounds<>(lastReturnedKey, bounds.right)
+                 : new Range<>(lastReturnedKey, bounds.right);
         }
+
+        return includeLastKey
+             ? new IncludingExcludingBounds<>(lastReturnedKey, bounds.right)
+             : new ExcludingBounds<>(lastReturnedKey, bounds.right);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index e95c358..93a0265 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -29,40 +29,36 @@ import org.apache.cassandra.transport.ProtocolVersion;
  *
  * For use by MultiPartitionPager.
  */
-public class SinglePartitionPager extends AbstractQueryPager
+public class SinglePartitionPager extends AbstractQueryPager<SinglePartitionReadQuery>
 {
-    private final SinglePartitionReadCommand command;
-
     private volatile PagingState.RowMark lastReturned;
 
-    public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, ProtocolVersion protocolVersion)
+    public SinglePartitionPager(SinglePartitionReadQuery query, PagingState state, ProtocolVersion protocolVersion)
     {
-        super(command, protocolVersion);
-        this.command = command;
+        super(query, protocolVersion);
 
         if (state != null)
         {
             lastReturned = state.rowMark;
-            restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
+            restoreState(query.partitionKey(), state.remaining, state.remainingInPartition);
         }
     }
 
-    private SinglePartitionPager(SinglePartitionReadCommand command,
+    private SinglePartitionPager(SinglePartitionReadQuery query,
                                  ProtocolVersion protocolVersion,
                                  PagingState.RowMark rowMark,
                                  int remaining,
                                  int remainingInPartition)
     {
-        super(command, protocolVersion);
-        this.command = command;
+        super(query, protocolVersion);
         this.lastReturned = rowMark;
-        restoreState(command.partitionKey(), remaining, remainingInPartition);
+        restoreState(query.partitionKey(), remaining, remainingInPartition);
     }
 
     @Override
     public SinglePartitionPager withUpdatedLimit(DataLimits newLimits)
     {
-        return new SinglePartitionPager(command.withUpdatedLimit(newLimits),
+        return new SinglePartitionPager(query.withUpdatedLimit(newLimits),
                                         protocolVersion,
                                         lastReturned,
                                         maxRemaining(),
@@ -71,12 +67,12 @@ public class SinglePartitionPager extends AbstractQueryPager
 
     public ByteBuffer key()
     {
-        return command.partitionKey().getKey();
+        return query.partitionKey().getKey();
     }
 
     public DataLimits limits()
     {
-        return command.limits();
+        return query.limits();
     }
 
     public PagingState state()
@@ -86,20 +82,21 @@ public class SinglePartitionPager extends AbstractQueryPager
              : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
     }
 
-    protected ReadCommand nextPageReadCommand(int pageSize)
+    @Override
+    protected SinglePartitionReadQuery nextPageReadQuery(int pageSize)
     {
-        Clustering clustering = lastReturned == null ? null : lastReturned.clustering(command.metadata());
+        Clustering clustering = lastReturned == null ? null : lastReturned.clustering(query.metadata());
         DataLimits limits = lastReturned == null
                           ? limits().forPaging(pageSize)
                           : limits().forPaging(pageSize, key(), remainingInPartition());
 
-        return command.forPaging(clustering, limits);
+        return query.forPaging(clustering, limits);
     }
 
     protected void recordLast(DecoratedKey key, Row last)
     {
         if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING)
-            lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+            lastReturned = PagingState.RowMark.create(query.metadata(), last, protocolVersion);
     }
 
     protected boolean isPreviouslyReturnedPartition(DecoratedKey key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 7b8ef94..0852312 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -186,6 +186,7 @@ public class CacheProviderTest
         assertEquals(key1.hashCode(), key2.hashCode());
 
         tm = TableMetadata.builder("ks", "tab.indexFoo", id1)
+                          .kind(TableMetadata.Kind.INDEX)
                           .addPartitionKeyColumn("pk", UTF8Type.instance)
                           .indexes(Indexes.of(IndexMetadata.fromSchemaMetadata("indexFoo", IndexMetadata.Kind.KEYS, Collections.emptyMap())))
                           .build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index e53342d..662e804 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -837,6 +837,11 @@ public abstract class CQLTester
         return sessionNet(protocolVersion).execute(formatQuery(query), values);
     }
 
+    protected com.datastax.driver.core.ResultSet executeNet(String query, Object... values) throws Throwable
+    {
+        return sessionNet().execute(formatQuery(query), values);
+    }
+
     protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable
     {
         return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/4] cassandra git commit: Implement virtual keyspace interface

Posted by al...@apache.org.
Implement virtual keyspace interface

patch by Benjamin Lehrer, Chris Lohfink, and Aleksey Yeschenko for
CASSANDRA-7622


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d464cd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d464cd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d464cd2

Branch: refs/heads/trunk
Commit: 0d464cd25ffbb5734f96c3082f9cc35011de3667
Parents: 3b6c938
Author: Chris Lohfink <cl...@apple.com>
Authored: Wed May 16 23:07:04 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Fri May 18 16:45:45 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 bin/cqlsh.py                                    |  84 ++++-
 .../ClusteringColumnRestrictions.java           |   8 +-
 .../restrictions/MultiColumnRestriction.java    |  14 +-
 .../PartitionKeySingleRestrictionSet.java       |   6 +-
 .../cql3/restrictions/Restriction.java          |  10 +-
 .../cql3/restrictions/RestrictionSet.java       |  10 +-
 .../restrictions/RestrictionSetWrapper.java     |  15 +-
 .../restrictions/SingleColumnRestriction.java   |  24 +-
 .../restrictions/StatementRestrictions.java     |  29 +-
 .../cql3/restrictions/TokenFilter.java          |  10 +-
 .../cql3/restrictions/TokenRestriction.java     |   6 +-
 .../cql3/statements/AlterKeyspaceStatement.java |   2 +
 .../cql3/statements/AlterTableStatement.java    |   3 +
 .../cql3/statements/BatchStatement.java         |  56 ++-
 .../cql3/statements/BatchUpdatesCollector.java  |  58 ++-
 .../cql3/statements/CQL3CasRequest.java         |  21 +-
 .../statements/CreateAggregateStatement.java    |   7 +-
 .../statements/CreateFunctionStatement.java     |   6 +-
 .../cql3/statements/CreateIndexStatement.java   |   3 +
 .../cql3/statements/CreateTableStatement.java   |   5 +-
 .../cql3/statements/CreateTriggerStatement.java |   2 +
 .../cql3/statements/CreateTypeStatement.java    |   2 +
 .../cql3/statements/CreateViewStatement.java    |   4 +-
 .../cql3/statements/DeleteStatement.java        |   2 +
 .../cql3/statements/DropKeyspaceStatement.java  |   4 +
 .../cql3/statements/DropTableStatement.java     |   3 +
 .../cql3/statements/ModificationStatement.java  |  12 +-
 .../cql3/statements/SelectStatement.java        |  31 +-
 .../statements/SingleTableUpdatesCollector.java |   8 +-
 .../cql3/statements/TruncateStatement.java      |  10 +
 .../apache/cassandra/db/AbstractReadQuery.java  | 116 ++++++
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +
 .../cassandra/db/PartitionRangeReadCommand.java |  38 +-
 .../cassandra/db/PartitionRangeReadQuery.java   |  93 +++++
 .../org/apache/cassandra/db/ReadCommand.java    | 100 +----
 src/java/org/apache/cassandra/db/ReadQuery.java | 184 ++++++---
 .../db/SinglePartitionReadCommand.java          | 178 ++-------
 .../cassandra/db/SinglePartitionReadQuery.java  | 290 +++++++++++++++
 .../db/VirtualTablePartitionRangeReadQuery.java | 113 ++++++
 .../cassandra/db/VirtualTableReadQuery.java     |  65 ++++
 .../VirtualTableSinglePartitionReadQuery.java   | 194 ++++++++++
 .../cassandra/db/filter/ColumnFilter.java       |   2 +
 .../db/partitions/PartitionIterators.java       |  10 +-
 .../db/partitions/PartitionUpdate.java          |   3 +-
 .../db/virtual/AbstractVirtualTable.java        | 221 +++++++++++
 .../cassandra/db/virtual/SimpleDataSet.java     | 191 ++++++++++
 .../db/virtual/SystemViewsKeyspace.java         |  32 ++
 .../cassandra/db/virtual/VirtualKeyspace.java   |  58 +++
 .../db/virtual/VirtualKeyspaceRegistry.java     |  77 ++++
 .../cassandra/db/virtual/VirtualMutation.java   | 111 ++++++
 .../db/virtual/VirtualSchemaKeyspace.java       | 149 ++++++++
 .../cassandra/db/virtual/VirtualTable.java      |  74 ++++
 .../apache/cassandra/index/IndexRegistry.java   |  69 ++++
 .../index/internal/CassandraIndex.java          |   1 +
 .../cassandra/schema/KeyspaceMetadata.java      |  39 +-
 .../org/apache/cassandra/schema/Schema.java     |  19 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   2 +-
 .../apache/cassandra/schema/TableMetadata.java  |  54 +--
 .../apache/cassandra/service/CASRequest.java    |   4 +-
 .../cassandra/service/CassandraDaemon.java      |   5 +
 .../apache/cassandra/service/ClientState.java   |   5 +
 .../apache/cassandra/service/StorageProxy.java  |  18 +-
 .../cassandra/service/StorageService.java       |  18 +-
 .../service/pager/AbstractQueryPager.java       |  30 +-
 .../service/pager/MultiPartitionPager.java      |  46 +--
 .../service/pager/PartitionRangeQueryPager.java |  51 ++-
 .../service/pager/SinglePartitionPager.java     |  33 +-
 .../cassandra/cache/CacheProviderTest.java      |   1 +
 .../org/apache/cassandra/cql3/CQLTester.java    |   5 +
 .../validation/entities/VirtualTableTest.java   | 372 +++++++++++++++++++
 71 files changed, 2936 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5657567..e7e5ecb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Implement virtual keyspace interface (CASSANDRA-7622)
  * nodetool import cleanup and improvements (CASSANDRA-14417)
  * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
  * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 3055110..e8e380f 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -437,6 +437,9 @@ class Shell(cmd.Cmd):
     shunted_query_out = None
     use_paging = True
 
+    # TODO remove after virtual tables are added to connection metadata
+    virtual_keyspaces = None
+
     default_page_size = 100
 
     def __init__(self, hostname, port, color=False,
@@ -628,7 +631,10 @@ class Shell(cmd.Cmd):
         self.connection_versions = vers
 
     def get_keyspace_names(self):
-        return map(str, self.conn.metadata.keyspaces.keys())
+        # TODO remove after virtual tables are added to connection metadata
+        if self.virtual_keyspaces is None:
+            self.init_virtual_keyspaces_meta()
+        return map(str, self.conn.metadata.keyspaces.keys() + self.virtual_keyspaces.keys())
 
     def get_columnfamily_names(self, ksname=None):
         if ksname is None:
@@ -692,9 +698,78 @@ class Shell(cmd.Cmd):
         return self.conn.metadata.partitioner
 
     def get_keyspace_meta(self, ksname):
-        if ksname not in self.conn.metadata.keyspaces:
-            raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
-        return self.conn.metadata.keyspaces[ksname]
+        if ksname in self.conn.metadata.keyspaces:
+            return self.conn.metadata.keyspaces[ksname]
+
+        # TODO remove after virtual tables are added to connection metadata
+        if self.virtual_keyspaces is None:
+            self.init_virtual_keyspaces_meta()
+        if ksname in self.virtual_keyspaces:
+            return self.virtual_keyspaces[ksname]
+
+        raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
+
+    # TODO remove after virtual tables are added to connection metadata
+    def init_virtual_keyspaces_meta(self):
+        self.virtual_keyspaces = {}
+        for vkeyspace in self.fetch_virtual_keyspaces():
+            self.virtual_keyspaces[vkeyspace.name] = vkeyspace
+
+    # TODO remove after virtual tables are added to connection metadata
+    def fetch_virtual_keyspaces(self):
+        keyspaces = []
+
+        result = self.session.execute('SELECT keyspace_name FROM system_virtual_schema.keyspaces;')
+        for row in result:
+            name = row['keyspace_name']
+            keyspace = KeyspaceMetadata(name, False, None, None)
+            tables = self.fetch_virtual_tables(name)
+            for table in tables:
+                keyspace.tables[table.name] = table
+            keyspaces.append(keyspace)
+
+        return keyspaces
+
+    # TODO remove after virtual tables are added to connection metadata
+    def fetch_virtual_tables(self, keyspace_name):
+        tables = []
+
+        result = self.session.execute("SELECT * FROM system_virtual_schema.tables WHERE keyspace_name = '{}';".format(keyspace_name))
+        for row in result:
+            name = row['table_name']
+            table = TableMetadata(keyspace_name, name)
+            self.fetch_virtual_columns(table)
+            tables.append(table)
+
+        return tables
+
+    # TODO remove after virtual tables are added to connection metadata
+    def fetch_virtual_columns(self, table):
+        result = self.session.execute("SELECT * FROM system_virtual_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}';".format(table.keyspace_name, table.name))
+
+        partition_key_columns = []
+        clustering_columns = []
+
+        for row in result:
+            name = row['column_name']
+            cql_type = row['type']
+            kind = row['kind']
+            position = row['position']
+            is_static = kind == 'static'
+            is_reversed = row['clustering_order'] == 'desc'
+            column = ColumnMetadata(table, name, cql_type, is_static, is_reversed)
+            table.columns[column.name] = column
+
+            if kind == 'partition_key':
+                partition_key_columns.append((position, column))
+            elif kind == 'clustering':
+                clustering_columns.append((position, column))
+
+        partition_key_columns.sort(key=lambda t: t[0])
+        clustering_columns.sort(key=lambda t: t[0])
+
+        table.partition_key  = map(lambda t: t[1], partition_key_columns)
+        table.clustering_key = map(lambda t: t[1], clustering_columns)
 
     def get_keyspaces(self):
         return self.conn.metadata.keyspaces.values()
@@ -707,7 +782,6 @@ class Shell(cmd.Cmd):
         if ksname is None:
             ksname = self.current_keyspace
         ksmeta = self.get_keyspace_meta(ksname)
-
         if tablename not in ksmeta.tables:
             if ksname == 'system_auth' and tablename in ['roles', 'role_permissions']:
                 self.get_fake_auth_table_meta(ksname, tablename)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
index f537255..265d354 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -199,7 +199,7 @@ final class ClusteringColumnRestrictions extends RestrictionSetWrapper
 
     @Override
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options) throws InvalidRequestException
     {
         int position = 0;
@@ -207,9 +207,9 @@ final class ClusteringColumnRestrictions extends RestrictionSetWrapper
         for (SingleRestriction restriction : restrictions)
         {
             // We ignore all the clustering columns that can be handled by slices.
-            if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexManager))
+            if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexRegistry))
             {
-                restriction.addRowFilterTo(filter, indexManager, options);
+                restriction.addRowFilterTo(filter, indexRegistry, options);
                 continue;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
index e1202a6..4c6ce2f 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -111,9 +111,9 @@ public abstract class MultiColumnRestriction implements SingleRestriction
     }
 
     @Override
-    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public final boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        for (Index index : indexManager.listIndexes())
+        for (Index index : indexRegistry.listIndexes())
            if (isSupportedBy(index))
                return true;
         return false;
@@ -186,7 +186,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
         }
 
         @Override
-        public final void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexMananger, QueryOptions options)
+        public final void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
         {
             Tuples.Value t = ((Tuples.Value) value.bind(options));
             List<ByteBuffer> values = t.getElements();
@@ -244,7 +244,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
 
         @Override
         public final void addRowFilterTo(RowFilter filter,
-                                         SecondaryIndexManager indexManager,
+                                         IndexRegistry indexRegistry,
                                          QueryOptions options)
         {
             throw  invalidRequest("IN restrictions are not supported on indexed columns");
@@ -473,7 +473,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
 
         @Override
         public final void addRowFilterTo(RowFilter filter,
-                                         SecondaryIndexManager indexManager,
+                                         IndexRegistry indexRegistry,
                                          QueryOptions options)
         {
             throw invalidRequest("Multi-column slice restrictions cannot be used for filtering.");
@@ -561,7 +561,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
         }
 
         @Override
-        public final void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexMananger, QueryOptions options)
+        public final void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
         {
             throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
index ac589be..5bb3242 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 /**
  * A set of single restrictions on the partition key.
@@ -121,12 +121,12 @@ final class PartitionKeySingleRestrictionSet extends RestrictionSetWrapper imple
 
     @Override
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options)
     {
         for (SingleRestriction restriction : restrictions)
         {
-             restriction.addRowFilterTo(filter, indexManager, options);
+             restriction.addRowFilterTo(filter, indexRegistry, options);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
index daace46..91dedad 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 /**
  * <p>Implementation of this class must be immutable.</p>
@@ -63,19 +63,19 @@ public interface Restriction
     /**
      * Check if the restriction is on indexed columns.
      *
-     * @param indexManager the index manager
+     * @param indexRegistry the index registry
      * @return <code>true</code> if the restriction is on indexed columns, <code>false</code>
      */
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry);
 
     /**
      * Adds to the specified row filter the expressions corresponding to this <code>Restriction</code>.
      *
      * @param filter the row filter to add expressions to
-     * @param indexManager the secondary index manager
+     * @param indexRegistry the index registry
      * @param options the query options
      */
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 7c805ce..427c396 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.ContainsRestriction;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -74,10 +74,10 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
     }
 
     @Override
-    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) throws InvalidRequestException
+    public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) throws InvalidRequestException
     {
         for (Restriction restriction : restrictions.values())
-            restriction.addRowFilterTo(filter, indexManager, options);
+            restriction.addRowFilterTo(filter, indexRegistry, options);
     }
 
     @Override
@@ -184,11 +184,11 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
     }
 
     @Override
-    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public final boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
         for (Restriction restriction : restrictions.values())
         {
-            if (restriction.hasSupportingIndex(indexManager))
+            if (restriction.hasSupportingIndex(indexRegistry))
                 return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index c310908..9803adc 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.cql3.restrictions;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.cassandra.index.IndexRegistry;
 
 /**
  * A <code>RestrictionSet</code> wrapper that can be extended to allow to modify the <code>RestrictionSet</code>
@@ -45,10 +46,10 @@ class RestrictionSetWrapper implements Restrictions
     }
 
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options)
     {
-        restrictions.addRowFilterTo(filter, indexManager, options);
+        restrictions.addRowFilterTo(filter, indexRegistry, options);
     }
 
     public List<ColumnMetadata> getColumnDefs()
@@ -71,9 +72,9 @@ class RestrictionSetWrapper implements Restrictions
         return restrictions.size();
     }
 
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        return restrictions.hasSupportingIndex(indexManager);
+        return restrictions.hasSupportingIndex(indexRegistry);
     }
 
     public ColumnMetadata getFirstColumn()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 44f95a8..1b3482b 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -71,9 +71,9 @@ public abstract class SingleColumnRestriction implements SingleRestriction
     }
 
     @Override
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        for (Index index : indexManager.listIndexes())
+        for (Index index : indexRegistry.listIndexes())
             if (isSupportedBy(index))
                 return true;
 
@@ -151,7 +151,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             filter.add(columnDef, Operator.EQ, value.bindAndGet(options));
@@ -215,7 +215,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             throw invalidRequest("IN restrictions are not supported on indexed columns");
@@ -385,7 +385,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
         }
 
         @Override
-        public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+        public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
         {
             for (Bound b : Bound.values())
                 if (hasBound(b))
@@ -475,7 +475,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
         }
 
         @Override
-        public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+        public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
         {
             for (ByteBuffer value : bindAndGet(values, options))
                 filter.add(columnDef, Operator.CONTAINS, value);
@@ -615,7 +615,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions");
@@ -691,16 +691,16 @@ public abstract class SingleColumnRestriction implements SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             Pair<Operator, ByteBuffer> operation = makeSpecific(value.bindAndGet(options));
 
             // there must be a suitable INDEX for LIKE_XXX expressions
             RowFilter.SimpleExpression expression = filter.add(columnDef, operation.left, operation.right);
-            indexManager.getBestIndexFor(expression)
-                        .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns",
-                                                          expression));
+            indexRegistry.getBestIndexFor(expression)
+                         .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns",
+                                                           expression));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 0240b6f..af1a964 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.btree.BTreeSet;
@@ -133,14 +133,9 @@ public final class StatementRestrictions
     {
         this(type, table, allowFiltering);
 
-        ColumnFamilyStore cfs;
-        SecondaryIndexManager secondaryIndexManager = null;
-
+        IndexRegistry indexRegistry = null;
         if (type.allowUseOfSecondaryIndices())
-        {
-            cfs = Keyspace.open(table.keyspace).getColumnFamilyStore(table.name);
-            secondaryIndexManager = cfs.indexManager;
-        }
+            indexRegistry = IndexRegistry.obtain(table);
 
         /*
          * WHERE clause. For a given entity, rules are:
@@ -165,7 +160,7 @@ public final class StatementRestrictions
             {
                 Restriction restriction = relation.toRestriction(table, boundNames);
 
-                if (!type.allowUseOfSecondaryIndices() || !restriction.hasSupportingIndex(secondaryIndexManager))
+                if (!type.allowUseOfSecondaryIndices() || !restriction.hasSupportingIndex(indexRegistry))
                     throw new InvalidRequestException(String.format("LIKE restriction is only supported on properly " +
                                                                     "indexed columns. %s is not valid.",
                                                                     relation.toString()));
@@ -186,13 +181,13 @@ public final class StatementRestrictions
         if (type.allowUseOfSecondaryIndices())
         {
             if (whereClause.containsCustomExpressions())
-                processCustomIndexExpressions(whereClause.expressions, boundNames, secondaryIndexManager);
+                processCustomIndexExpressions(whereClause.expressions, boundNames, indexRegistry);
 
-            hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
+            hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(indexRegistry);
             hasQueriableIndex = !filterRestrictions.getCustomIndexExpressions().isEmpty()
                     || hasQueriableClusteringColumnIndex
-                    || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
-                    || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
+                    || partitionKeyRestrictions.hasSupportingIndex(indexRegistry)
+                    || nonPrimaryKeyRestrictions.hasSupportingIndex(indexRegistry);
         }
 
         // At this point, the select statement if fully constructed, but we still have a few things to validate
@@ -564,7 +559,7 @@ public final class StatementRestrictions
 
     private void processCustomIndexExpressions(List<CustomIndexExpression> expressions,
                                                VariableSpecifications boundNames,
-                                               SecondaryIndexManager indexManager)
+                                               IndexRegistry indexRegistry)
     {
         if (expressions.size() > 1)
             throw new InvalidRequestException(IndexRestrictions.MULTIPLE_EXPRESSIONS);
@@ -582,7 +577,7 @@ public final class StatementRestrictions
         if (!table.indexes.has(expression.targetIndex.getIdx()))
             throw IndexRestrictions.indexNotFound(expression.targetIndex, table);
 
-        Index index = indexManager.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
+        Index index = indexRegistry.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
 
         if (!index.getIndexMetadata().isCustom())
             throw IndexRestrictions.nonCustomIndexInExpression(expression.targetIndex);
@@ -596,14 +591,14 @@ public final class StatementRestrictions
         filterRestrictions.add(expression);
     }
 
-    public RowFilter getRowFilter(SecondaryIndexManager indexManager, QueryOptions options)
+    public RowFilter getRowFilter(IndexRegistry indexRegistry, QueryOptions options)
     {
         if (filterRestrictions.isEmpty())
             return RowFilter.NONE;
 
         RowFilter filter = RowFilter.create();
         for (Restrictions restrictions : filterRestrictions.getRestrictions())
-            restrictions.addRowFilterTo(filter, indexManager, options);
+            restrictions.addRowFilterTo(filter, indexRegistry, options);
 
         for (CustomIndexExpression expression : filterRestrictions.getCustomIndexExpressions())
             expression.addToRowFilter(filter, table, options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index a80b811..437b17c 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 import static org.apache.cassandra.cql3.statements.Bound.END;
 import static org.apache.cassandra.cql3.statements.Bound.START;
@@ -272,15 +272,15 @@ final class TokenFilter implements PartitionKeyRestrictions
     }
 
     @Override
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        return restrictions.hasSupportingIndex(indexManager);
+        return restrictions.hasSupportingIndex(indexRegistry);
     }
 
     @Override
-    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+    public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
     {
-        restrictions.addRowFilterTo(filter, indexManager, options);
+        restrictions.addRowFilterTo(filter, indexRegistry, options);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index 806974a..e71b177 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 
@@ -116,13 +116,13 @@ public abstract class TokenRestriction implements PartitionKeyRestrictions
     }
 
     @Override
-    public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
         return false;
     }
 
     @Override
-    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
+    public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options)
     {
         throw new UnsupportedOperationException("Index expression cannot be created for token restriction");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index e26910d..00d2b94 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -66,6 +66,8 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
             throw new InvalidRequestException("Unknown keyspace " + name);
         if (SchemaConstants.isLocalSystemKeyspace(ksm.name))
             throw new InvalidRequestException("Cannot alter system keyspace");
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot alter virtual keyspaces");
 
         attrs.validate();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index b3aeb74..260c8fd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -88,6 +88,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
         if (current.isView())
             throw new InvalidRequestException("Cannot use ALTER TABLE on Materialized View");
 
+        if (current.isVirtual())
+            throw new InvalidRequestException("Cannot alter virtual tables");
+
         TableMetadata.Builder builder = current.unbuild();
 
         ColumnIdentifier columnName = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 2fcd867..a71c799 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -70,6 +70,8 @@ public class BatchStatement implements CQLStatement
     private final boolean updatesStaticRow;
     private final Attributes attrs;
     private final boolean hasConditions;
+    private final boolean updatesVirtualTables;
+
     private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
 
     private static final String UNLOGGED_BATCH_WARNING = "Unlogged batch covering {} partitions detected " +
@@ -103,11 +105,13 @@ public class BatchStatement implements CQLStatement
         RegularAndStaticColumns.Builder conditionBuilder = RegularAndStaticColumns.builder();
         boolean updateRegular = false;
         boolean updateStatic = false;
+        boolean updatesVirtualTables = false;
 
         for (ModificationStatement stmt : statements)
         {
             regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns());
             updateRegular |= stmt.updatesRegularRows();
+            updatesVirtualTables |= stmt.isVirtual();
             if (stmt.hasConditions())
             {
                 hasConditions = true;
@@ -121,6 +125,7 @@ public class BatchStatement implements CQLStatement
         this.updatesRegularRows = updateRegular;
         this.updatesStaticRow = updateStatic;
         this.hasConditions = hasConditions;
+        this.updatesVirtualTables = updatesVirtualTables;
     }
 
     public Iterable<org.apache.cassandra.cql3.functions.Function> getFunctions()
@@ -161,29 +166,46 @@ public class BatchStatement implements CQLStatement
         boolean hasCounters = false;
         boolean hasNonCounters = false;
 
+        boolean hasVirtualTables = false;
+        boolean hasRegularTables = false;
+
         for (ModificationStatement statement : statements)
         {
-            if (timestampSet && statement.isCounter())
-                throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters");
-
             if (timestampSet && statement.isTimestampSet())
                 throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
 
-            if (isCounter() && !statement.isCounter())
-                throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
-
-            if (isLogged() && statement.isCounter())
-                throw new InvalidRequestException("Cannot include a counter statement in a logged batch");
-
             if (statement.isCounter())
                 hasCounters = true;
             else
                 hasNonCounters = true;
+
+            if (statement.isVirtual())
+                hasVirtualTables = true;
+            else
+                hasRegularTables = true;
         }
 
+        if (timestampSet && hasCounters)
+            throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters");
+
+        if (isCounter() && hasNonCounters)
+            throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
+
         if (hasCounters && hasNonCounters)
             throw new InvalidRequestException("Counter and non-counter mutations cannot exist in the same batch");
 
+        if (isLogged() && hasCounters)
+            throw new InvalidRequestException("Cannot include a counter statement in a logged batch");
+
+        if (isLogged() && hasVirtualTables)
+            throw new InvalidRequestException("Cannot include a virtual table statement in a logged batch");
+
+        if (hasVirtualTables && hasRegularTables)
+            throw new InvalidRequestException("Mutations for virtual and regular tables cannot exist in the same batch");
+
+        if (hasConditions && hasVirtualTables)
+            throw new InvalidRequestException("Conditional BATCH statements cannot include mutations for virtual tables");
+
         if (hasConditions)
         {
             String ksName = null;
@@ -353,7 +375,11 @@ public class BatchStatement implements CQLStatement
         if (hasConditions)
             return executeWithConditions(options, queryState, queryStartNanoTime);
 
-        executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+        if (updatesVirtualTables)
+            executeInternalWithoutCondition(queryState, options, queryStartNanoTime);
+        else    
+            executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+
         return new ResultMessage.Void();
     }
 
@@ -482,16 +508,18 @@ public class BatchStatement implements CQLStatement
 
     public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
+        BatchQueryOptions batchOptions = BatchQueryOptions.withoutPerStatementVariables(options);
+
         if (hasConditions)
-            return executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options), queryState);
+            return executeInternalWithConditions(batchOptions, queryState);
 
-        executeInternalWithoutCondition(queryState, options, System.nanoTime());
+        executeInternalWithoutCondition(queryState, batchOptions, System.nanoTime());
         return new ResultMessage.Void();
     }
 
-    private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
+    private ResultMessage executeInternalWithoutCondition(QueryState queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryStartNanoTime))
+        for (IMutation mutation : getMutations(batchOptions, true, queryState.getTimestamp(), queryStartNanoTime))
             mutation.apply();
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index 9671b02..96d9f5a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -84,15 +84,20 @@ final class BatchUpdatesCollector implements UpdatesCollector
 
     private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
     {
-        String ksName = metadata.keyspace;
-        IMutationBuilder mutationBuilder = keyspaceMap(ksName).get(dk.getKey());
-        if (mutationBuilder == null)
+        return keyspaceMap(metadata.keyspace).computeIfAbsent(dk.getKey(), k -> makeMutationBuilder(metadata, dk, consistency));
+    }
+
+    private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKey partitionKey, ConsistencyLevel cl)
+    {
+        if (metadata.isVirtual())
         {
-            MutationBuilder builder = new MutationBuilder(ksName, dk);
-            mutationBuilder = metadata.isCounter() ? new CounterMutationBuilder(builder, consistency) : builder;
-            keyspaceMap(ksName).put(dk.getKey(), mutationBuilder);
+            return new VirtualMutationBuilder(metadata.keyspace, partitionKey);
+        }
+        else
+        {
+            MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey);
+            return metadata.isCounter() ? new CounterMutationBuilder(builder, cl) : builder;
         }
-        return mutationBuilder;
     }
 
     /**
@@ -228,4 +233,41 @@ final class BatchUpdatesCollector implements UpdatesCollector
             return mutationBuilder.get(id);
         }
     }
+
+    private static class VirtualMutationBuilder implements IMutationBuilder
+    {
+        private final String keyspaceName;
+        private final DecoratedKey partitionKey;
+
+        private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>();
+
+        private VirtualMutationBuilder(String keyspaceName, DecoratedKey partitionKey)
+        {
+            this.keyspaceName = keyspaceName;
+            this.partitionKey = partitionKey;
+        }
+
+        @Override
+        public VirtualMutationBuilder add(PartitionUpdate.Builder builder)
+        {
+            PartitionUpdate.Builder prev = modifications.put(builder.metadata().id, builder);
+            if (null != prev)
+                throw new IllegalStateException();
+            return this;
+        }
+
+        @Override
+        public VirtualMutation build()
+        {
+            ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
+            modifications.forEach((tableId, updateBuilder) -> updates.put(tableId, updateBuilder.build()));
+            return new VirtualMutation(keyspaceName, partitionKey, updates.build());
+        }
+
+        @Override
+        public PartitionUpdate.Builder get(TableId tableId)
+        {
+            return modifications.get(tableId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index eb6aa70..7953c8b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import com.google.common.collect.*;
 
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
@@ -183,7 +184,7 @@ public class CQL3CasRequest implements CASRequest
         return conditionColumns;
     }
 
-    public SinglePartitionReadCommand readCommand(int nowInSec)
+    public SinglePartitionReadQuery readCommand(int nowInSec)
     {
         assert staticConditions != null || !conditions.isEmpty();
 
@@ -193,16 +194,16 @@ public class CQL3CasRequest implements CASRequest
         // With only a static condition, we still want to make the distinction between a non-existing partition and one
         // that exists (has some live data) but has not static content. So we query the first live row of the partition.
         if (conditions.isEmpty())
-            return SinglePartitionReadCommand.create(metadata,
-                                                     nowInSec,
-                                                     columnFilter,
-                                                     RowFilter.NONE,
-                                                     DataLimits.cqlLimits(1),
-                                                     key,
-                                                     new ClusteringIndexSliceFilter(Slices.ALL, false));
+            return SinglePartitionReadQuery.create(metadata,
+                                                   nowInSec,
+                                                   columnFilter,
+                                                   RowFilter.NONE,
+                                                   DataLimits.cqlLimits(1),
+                                                   key,
+                                                   new ClusteringIndexSliceFilter(Slices.ALL, false));
 
         ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(conditions.navigableKeySet(), false);
-        return SinglePartitionReadCommand.create(metadata, nowInSec, key, columnFilter, filter);
+        return SinglePartitionReadQuery.create(metadata, nowInSec, key, columnFilter, filter);
     }
 
     /**
@@ -244,7 +245,7 @@ public class CQL3CasRequest implements CASRequest
             upd.applyUpdates(current, updateBuilder);
 
         PartitionUpdate partitionUpdate = updateBuilder.build();
-        Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate);
+        IndexRegistry.obtain(metadata).validate(partitionUpdate);
 
         return partitionUpdate;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index d3ef599..e428087 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
@@ -206,8 +207,12 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
         if (ifNotExists && orReplace)
             throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
 
-        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
+
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(functionName.keyspace);
+        if (null == ksm)
             throw new InvalidRequestException(String.format("Cannot add aggregate '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot create aggregates in virtual keyspaces");
     }
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index 1f0e703..c380991 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -136,8 +137,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
         if (ifNotExists && orReplace)
             throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
 
-        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(functionName.keyspace);
+        if (null == ksm)
             throw new InvalidRequestException(String.format("Cannot add function '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot create functions in virtual keyspaces");
     }
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 9d0a714..778c4a3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -82,6 +82,9 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     {
         TableMetadata table = Schema.instance.validateTable(keyspace(), columnFamily());
 
+        if (table.isVirtual())
+            throw new InvalidRequestException("Secondary indexes are not supported on virtual tables");
+
         if (table.isCounter())
             throw new InvalidRequestException("Secondary indexes are not supported on counter tables");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 55249c4..7c639e2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.regex.Pattern;
+
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 import org.apache.commons.lang3.StringUtils;
@@ -134,7 +135,6 @@ public class CreateTableStatement extends SchemaAlteringStatement
                .isCompound(isCompound)
                .isCounter(hasCounters)
                .isSuper(false)
-               .isView(false)
                .params(params);
 
         for (int i = 0; i < keyAliases.size(); i++)
@@ -213,6 +213,9 @@ public class CreateTableStatement extends SchemaAlteringStatement
             KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace());
             if (ksm == null)
                 throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace()));
+            if (ksm.isVirtual())
+                throw new InvalidRequestException("Cannot create tables in virtual keyspaces");
+
             return prepare(ksm.types);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
index d57bff7..f2cd217 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
@@ -62,6 +62,8 @@ public class CreateTriggerStatement extends SchemaAlteringStatement
     public void validate(ClientState state) throws RequestValidationException
     {
         TableMetadata metadata = Schema.instance.validateTable(keyspace(), columnFamily());
+        if (metadata.isVirtual())
+            throw new InvalidRequestException("Cannot CREATE TRIGGER against a virtual table");
         if (metadata.isView())
             throw new InvalidRequestException("Cannot CREATE TRIGGER against a materialized view");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index 7dce478..1a0da4c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -73,6 +73,8 @@ public class CreateTypeStatement extends SchemaAlteringStatement
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(name.getKeyspace());
         if (ksm == null)
             throw new InvalidRequestException(String.format("Cannot add type in unknown keyspace %s", name.getKeyspace()));
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot create types in virtual keyspaces");
 
         if (ksm.types.get(name.getUserTypeName()).isPresent() && !ifNotExists)
             throw new InvalidRequestException(String.format("A user type of name %s already exists", name));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 01ed6fe..b50a552 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -151,6 +151,8 @@ public class CreateViewStatement extends SchemaAlteringStatement
 
         TableMetadata metadata = Schema.instance.validateTable(baseName.getKeyspace(), baseName.getColumnFamily());
 
+        if (metadata.isVirtual())
+            throw new InvalidRequestException("Materialized views are not supported on virtual tables");
         if (metadata.isCounter())
             throw new InvalidRequestException("Materialized views are not supported on counter tables");
         if (metadata.isView())
@@ -317,7 +319,7 @@ public class CreateViewStatement extends SchemaAlteringStatement
 
         TableMetadata.Builder builder =
             TableMetadata.builder(keyspace(), columnFamily(), properties.properties.getId())
-                         .isView(true)
+                         .kind(TableMetadata.Kind.VIEW)
                          .params(params);
 
         add(metadata, targetPartitionKeys, builder::addPartitionKeyColumn);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index af04572..639286c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -144,6 +144,8 @@ public class DeleteStatement extends ModificationStatement
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
+            checkFalse(metadata.isVirtual(), "Virtual tables don't support DELETE statements");
+
             Operations operations = new Operations(type);
 
             for (Operation.RawDeletion deletion : deletions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
index cfc6564..2f302c0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -61,6 +62,9 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws ConfigurationException
     {
+        if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspace))
+            throw new InvalidRequestException("Cannot drop virtual keyspaces");
+
         try
         {
             MigrationManager.announceKeyspaceDrop(keyspace, isLocalOnly);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index d7801e5..beb1002 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -75,6 +75,9 @@ public class DropTableStatement extends SchemaAlteringStatement
                 if (metadata.isView())
                     throw new InvalidRequestException("Cannot use DROP TABLE on Materialized View");
 
+                if (metadata.isVirtual())
+                    throw new InvalidRequestException("Cannot drop virtual tables");
+
                 boolean rejectDrop = false;
                 StringBuilder messageBuilder = new StringBuilder();
                 for (ViewMetadata def : ksm.views)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 31aa80c..e02fd41 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -203,6 +203,11 @@ public abstract class ModificationStatement implements CQLStatement
         return metadata().isView();
     }
 
+    public boolean isVirtual()
+    {
+        return metadata().isVirtual();
+    }
+
     public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
         return attrs.getTimestamp(now, options);
@@ -248,6 +253,8 @@ public abstract class ModificationStatement implements CQLStatement
         checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates");
         checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates");
         checkFalse(isView(), "Cannot directly modify a materialized view");
+        checkFalse(isVirtual() && attrs.isTimeToLiveSet(), "Expiring columns are not supported by virtual tables");
+        checkFalse(isVirtual() && hasConditions(), "Conditional updates are not supported by virtual tables");
     }
 
     public RegularAndStaticColumns updatedColumns()
@@ -440,6 +447,9 @@ public abstract class ModificationStatement implements CQLStatement
     private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
+        if (isVirtual())
+            return executeInternalWithoutCondition(queryState, options, queryStartNanoTime);
+
         ConsistencyLevel cl = options.getConsistency();
         if (isCounter())
             cl.validateCounterForWrite(metadata());
@@ -613,7 +623,7 @@ public abstract class ModificationStatement implements CQLStatement
     {
         UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
 
-        SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+        SinglePartitionReadQuery readCommand = request.readCommand(FBUtilities.nowInSeconds());
         FilteredPartition current;
         try (ReadExecutionController executionController = readCommand.executionController();
              PartitionIterator iter = readCommand.executeInternal(executionController))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7fa9964..ef3db51 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -55,7 +55,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
@@ -458,7 +458,7 @@ public class SelectStatement implements CQLStatement
     {
         QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
 
-        if (aggregationSpec == null || query == ReadQuery.EMPTY)
+        if (aggregationSpec == null || query.isEmpty())
             return pager;
 
         return new AggregationQueryPager(pager, query.limits());
@@ -501,25 +501,22 @@ public class SelectStatement implements CQLStatement
     {
         Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
         if (keys.isEmpty())
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
         ClusteringIndexFilter filter = makeClusteringIndexFilter(options, columnFilter);
         if (filter == null)
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
         RowFilter rowFilter = getRowFilter(options);
 
-        // Note that we use the total limit for every key, which is potentially inefficient.
-        // However, IN + LIMIT is not a very sensible choice.
-        List<SinglePartitionReadCommand> commands = new ArrayList<>(keys.size());
+        List<DecoratedKey> decoratedKeys = new ArrayList<>(keys.size());
         for (ByteBuffer key : keys)
         {
             QueryProcessor.validateKey(key);
-            DecoratedKey dk = table.partitioner.decorateKey(ByteBufferUtil.clone(key));
-            commands.add(SinglePartitionReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, dk, filter));
+            decoratedKeys.add(table.partitioner.decorateKey(ByteBufferUtil.clone(key)));
         }
 
-        return new SinglePartitionReadCommand.Group(commands, limit);
+        return SinglePartitionReadQuery.createGroup(table, nowInSec, columnFilter, rowFilter, limit, decoratedKeys, filter);
     }
 
     /**
@@ -569,7 +566,7 @@ public class SelectStatement implements CQLStatement
     {
         ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options, columnFilter);
         if (clusteringIndexFilter == null)
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
         RowFilter rowFilter = getRowFilter(options);
 
@@ -577,10 +574,10 @@ public class SelectStatement implements CQLStatement
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
         AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
         if (keyBounds == null)
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
-        PartitionRangeReadCommand command =
-            PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+        ReadQuery command =
+            PartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
 
         // If there's a secondary index that the command can use, have it validate the request parameters.
         command.maybeValidateIndex();
@@ -755,10 +752,8 @@ public class SelectStatement implements CQLStatement
      */
     public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException
     {
-        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
-        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
-        RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options);
-        return filter;
+        IndexRegistry indexRegistry = IndexRegistry.obtain(table);
+        return restrictions.getRowFilter(indexRegistry, options);
     }
 
     private ResultSet process(PartitionIterator partitions,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 9eaf897..1def3fd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.schema.TableMetadata;
 
 /**
@@ -86,12 +87,15 @@ final class SingleTableUpdatesCollector implements UpdatesCollector
         List<IMutation> ms = new ArrayList<>();
         for (PartitionUpdate.Builder builder : puBuilders.values())
         {
-            IMutation mutation = null;
+            IMutation mutation;
 
-            if (metadata.isCounter())
+            if (metadata.isVirtual())
+                mutation = new VirtualMutation(builder.build());
+            else if (metadata.isCounter())
                 mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel);
             else
                 mutation = new Mutation(builder.build());
+
             mutation.validateIndexedColumns();
             ms.add(mutation);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 5d09cfa..d41a814 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -69,6 +69,9 @@ public class TruncateStatement extends CFStatement implements CQLStatement
             if (metaData.isView())
                 throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
 
+            if (metaData.isVirtual())
+                throw new InvalidRequestException("Cannot truncate virtual tables");
+
             StorageProxy.truncateBlocking(keyspace(), columnFamily());
         }
         catch (UnavailableException | TimeoutException e)
@@ -82,6 +85,13 @@ public class TruncateStatement extends CFStatement implements CQLStatement
     {
         try
         {
+            TableMetadata metaData = Schema.instance.getTableMetadata(keyspace(), columnFamily());
+            if (metaData.isView())
+                throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
+
+            if (metaData.isVirtual())
+                throw new InvalidRequestException("Cannot truncate virtual tables");
+
             ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
             cfs.truncateBlocking();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/AbstractReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadQuery.java b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
new file mode 100644
index 0000000..c6ec329
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Base class for {@code ReadQuery} implementations.
+ */
+abstract class AbstractReadQuery extends MonitorableImpl implements ReadQuery
+{
+    private final TableMetadata metadata;
+    private final int nowInSec;
+
+    private final ColumnFilter columnFilter;
+    private final RowFilter rowFilter;
+    private final DataLimits limits;
+
+    protected AbstractReadQuery(TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+    {
+        this.metadata = metadata;
+        this.nowInSec = nowInSec;
+        this.columnFilter = columnFilter;
+        this.rowFilter = rowFilter;
+        this.limits = limits;
+    }
+
+    @Override
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
+    // Monitorable interface
+    public String name()
+    {
+        return toCQLString();
+    }
+
+    @Override
+    public PartitionIterator executeInternal(ReadExecutionController controller)
+    {
+        return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
+    }
+
+    @Override
+    public DataLimits limits()
+    {
+        return limits;
+    }
+
+    @Override
+    public int nowInSec()
+    {
+        return nowInSec;
+    }
+
+    @Override
+    public RowFilter rowFilter()
+    {
+        return rowFilter;
+    }
+
+    @Override
+    public ColumnFilter columnFilter()
+    {
+        return columnFilter;
+    }
+
+    /**
+     * Recreate the CQL string corresponding to this query.
+     * <p>
+     * Note that in general the returned string will not be exactly the original user string, first
+     * because there isn't always a single syntax for a given query,  but also because we don't have
+     * all the information needed (we know the non-PK columns queried but not the PK ones as internally
+     * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
+     * debugging purpose which is what this is for.
+     */
+    public String toCQLString()
+    {
+        StringBuilder sb = new StringBuilder().append("SELECT ")
+                                              .append(columnFilter())
+                                              .append(" FROM ")
+                                              .append(metadata().keyspace)
+                                              .append('.')
+                                              .append(metadata().name);
+        appendCQLWhereClause(sb);
+
+        if (limits() != DataLimits.NONE)
+            sb.append(' ').append(limits());
+        return sb.toString();
+    }
+
+    protected abstract void appendCQLWhereClause(StringBuilder sb);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 651d156..50720f4 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -327,6 +327,8 @@ public class Keyspace
     {
         metadata = Schema.instance.getKeyspaceMetadata(keyspaceName);
         assert metadata != null : "Unknown keyspace " + keyspaceName;
+        if (metadata.isVirtual())
+            throw new IllegalStateException("Cannot initialize Keyspace with virtual metadata " + keyspaceName);
         createReplicationStrategy(metadata);
 
         this.metric = new KeyspaceMetrics(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 27d7d4e..a6641d4 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -45,15 +45,13 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * A read command that selects a (part of a) range of partitions.
  */
-public class PartitionRangeReadCommand extends ReadCommand
+public class PartitionRangeReadCommand extends ReadCommand implements PartitionRangeReadQuery
 {
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
@@ -187,7 +185,8 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              indexMetadata());
     }
 
-    public ReadCommand withUpdatedLimit(DataLimits newLimits)
+    @Override
+    public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
                                              digestVersion(),
@@ -200,6 +199,7 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              indexMetadata());
     }
 
+    @Override
     public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
@@ -218,34 +218,11 @@ public class PartitionRangeReadCommand extends ReadCommand
         return DatabaseDescriptor.getRangeRpcTimeout();
     }
 
-    public boolean selectsKey(DecoratedKey key)
-    {
-        if (!dataRange().contains(key))
-            return false;
-
-        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
-    }
-
-    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
-    {
-        if (clustering == Clustering.STATIC_CLUSTERING)
-            return !columnFilter().fetchedColumns().statics.isEmpty();
-
-        if (!dataRange().clusteringIndexFilter(key).selects(clustering))
-            return false;
-        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
-    }
-
     public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
     {
         return StorageProxy.getRangeSlice(this, consistency, queryStartNanoTime);
     }
 
-    public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
-    {
-            return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
-    }
-
     protected void recordLatency(TableMetrics metric, long latencyNanos)
     {
         metric.rangeLatency.addNano(latencyNanos);
@@ -389,13 +366,6 @@ public class PartitionRangeReadCommand extends ReadCommand
     }
 
     @Override
-    public boolean selectsFullPartition()
-    {
-        return metadata().isStaticCompactTable() ||
-               (dataRange.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns());
-    }
-
-    @Override
     public String toString()
     {
         return String.format("Read(%s columns=%s rowfilter=%s limits=%s %s)",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org