You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/02/09 01:09:15 UTC

[cassandra] branch trunk updated: Make vtables accessible via internode messaging

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f0c9713  Make vtables accessible via internode messaging
f0c9713 is described below

commit f0c97132c969cc8dd028d00c5e68ada8b0b9b9c6
Author: David Capwell <dc...@apache.org>
AuthorDate: Tue Feb 8 13:37:12 2022 -0800

    Make vtables accessible via internode messaging
    
    patch by David Capwell; reviewed by Aleksey Yeschenko for CASSANDRA-17295
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/cql3/QueryProcessor.java  |  63 +++++
 .../cql3/statements/ModificationStatement.java     |   2 +-
 .../cassandra/cql3/statements/SelectStatement.java |   2 +-
 .../cassandra/db/PartitionRangeReadCommand.java    | 266 ++++++++++++++-------
 .../cassandra/db/PartitionRangeReadQuery.java      |   3 -
 .../cassandra/db/ReadCommandVerbHandler.java       |   2 +
 .../cassandra/db/SinglePartitionReadCommand.java   | 249 ++++++++++++++-----
 .../cassandra/db/SinglePartitionReadQuery.java     |   8 +-
 .../db/VirtualTablePartitionRangeReadQuery.java    | 103 --------
 .../apache/cassandra/db/VirtualTableReadQuery.java |  66 -----
 .../db/VirtualTableSinglePartitionReadQuery.java   | 190 ---------------
 .../db/virtual/VirtualKeyspaceRegistry.java        |   5 +-
 .../org/apache/cassandra/dht/LocalPartitioner.java |   2 +-
 .../org/apache/cassandra/net/MessagingService.java |  51 ++++
 .../apache/cassandra/schema/SchemaConstants.java   |  16 +-
 src/java/org/apache/cassandra/schema/TableId.java  |   2 +-
 .../org/apache/cassandra/schema/TableMetadata.java |   4 +-
 .../test/VirtualTableFromInternode.java            | 142 +++++++++++
 .../distributed/util/QueryResultUtil.java          | 130 ++++++++++
 .../db/AbstractReadQueryToCQLStringTest.java       |   5 -
 .../org/apache/cassandra/db/ReadCommandTest.java   |   6 +-
 .../db/virtual/CredentialsCacheKeysTableTest.java  |   1 +
 .../virtual/JmxPermissionsCacheKeysTableTest.java  |   1 +
 .../NetworkPermissionsCacheKeysTableTest.java      |   1 +
 .../db/virtual/PermissionsCacheKeysTableTest.java  |   1 +
 .../db/virtual/RolesCacheKeysTableTest.java        |   1 +
 .../db/virtual/SSTableTasksTableTest.java          |   1 +
 .../cassandra/db/virtual/SettingsTableTest.java    |   1 +
 .../db/virtual/SystemPropertiesTableTest.java      |   1 +
 .../apache/cassandra/service/QueryPagerTest.java   |   2 +-
 31 files changed, 788 insertions(+), 540 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index bf0de22..f8653d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Make vtables accessible via internode messaging (CASSANDRA-17295)
  * Add support for PEM based key material for SSL (CASSANDRA-17031)
  * Standardize storage configuration parameters' names. Support unit suffixes. (CASSANDRA-15234)
  * Remove support for Windows (CASSANDRA-16956)
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index c996d0d..1b8e94d 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -37,8 +38,13 @@ import org.antlr.runtime.*;
 import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
 import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaChangeListener;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -59,6 +65,8 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
 import static org.apache.cassandra.config.CassandraRelevantProperties.ENABLE_NODELOCAL_QUERIES;
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
@@ -428,6 +436,61 @@ public class QueryProcessor implements QueryHandler
             return null;
     }
 
+    public static Future<UntypedResultSet> executeAsync(InetAddressAndPort address, String query, Object... values)
+    {
+        Prepared prepared = prepareInternal(query);
+        QueryOptions options = makeInternalOptions(prepared.statement, values);
+        if (prepared.statement instanceof SelectStatement)
+        {
+            SelectStatement select = (SelectStatement) prepared.statement;
+            int nowInSec = FBUtilities.nowInSeconds();
+            ReadQuery readQuery = select.getQuery(options, nowInSec);
+            List<ReadCommand> commands;
+            if (readQuery instanceof ReadCommand)
+            {
+                commands = Collections.singletonList((ReadCommand) readQuery);
+            }
+            else if (readQuery instanceof SinglePartitionReadQuery.Group)
+            {
+                List<? extends SinglePartitionReadQuery> queries = ((SinglePartitionReadQuery.Group<? extends SinglePartitionReadQuery>) readQuery).queries;
+                queries.forEach(a -> {
+                    if (!(a instanceof ReadCommand))
+                        throw new IllegalArgumentException("Queries found which are not ReadCommand: " + a.getClass());
+                });
+                commands = (List<ReadCommand>) (List<?>) queries;
+            }
+            else
+            {
+                throw new IllegalArgumentException("Unable to handle; only expected ReadCommands but given " + readQuery.getClass());
+            }
+            Future<List<Message<ReadResponse>>> future = FutureCombiner.allOf(commands.stream()
+                                                                                      .map(rc -> Message.out(rc.verb(), rc))
+                                                                                      .map(m -> MessagingService.instance().<ReadResponse>sendWithResult(m, address))
+                                                                                      .collect(Collectors.toList()));
+
+            ResultSetBuilder result = new ResultSetBuilder(select.getResultMetadata(), select.getSelection().newSelectors(options), null);
+            return future.map(list -> {
+                int i = 0;
+                for (Message<ReadResponse> m : list)
+                {
+                    ReadResponse rsp = m.payload;
+                    try (PartitionIterator it = UnfilteredPartitionIterators.filter(rsp.makeIterator(commands.get(i++)), nowInSec))
+                    {
+                        while (it.hasNext())
+                        {
+                            try (RowIterator partition = it.next())
+                            {
+                                select.processPartition(partition, options, result, nowInSec);
+                            }
+                        }
+                    }
+                }
+                return result.build();
+            }).map(UntypedResultSet::create);
+        }
+        throw new IllegalArgumentException("Unable to execute query; only SELECT supported but given: " + query);
+    }
+
     public static UntypedResultSet execute(String query, ConsistencyLevel cl, Object... values)
     throws RequestExecutionException
     {
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index eed528f..4233d23 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -410,7 +410,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
                                                            metadata().partitioner.decorateKey(key),
                                                            filter));
 
-        SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+        SinglePartitionReadCommand.Group group = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
 
         if (local)
         {
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7a710cc..cd0a76b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -869,7 +869,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
     }
 
     // Used by ModificationStatement for CAS operations
-    void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec)
+    public void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec)
     throws InvalidRequestException
     {
         maybeFail(result, options);
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index e69c288..ef12853 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -52,7 +54,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
 {
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
-    private final DataRange dataRange;
+    protected final DataRange dataRange;
 
     private PartitionRangeReadCommand(boolean isDigest,
                                       int digestVersion,
@@ -70,24 +72,63 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
         this.dataRange = dataRange;
     }
 
-    public static PartitionRangeReadCommand create(TableMetadata metadata,
-                                                   int nowInSec,
-                                                   ColumnFilter columnFilter,
-                                                   RowFilter rowFilter,
-                                                   DataLimits limits,
-                                                   DataRange dataRange)
+    private static PartitionRangeReadCommand create(boolean isDigest,
+                                                    int digestVersion,
+                                                    boolean acceptsTransient,
+                                                    TableMetadata metadata,
+                                                    int nowInSec,
+                                                    ColumnFilter columnFilter,
+                                                    RowFilter rowFilter,
+                                                    DataLimits limits,
+                                                    DataRange dataRange,
+                                                    IndexMetadata index,
+                                                    boolean trackWarnings)
     {
-        return new PartitionRangeReadCommand(false,
-                                             0,
-                                             false,
+        if (metadata.isVirtual())
+        {
+            return new VirtualTablePartitionRangeReadCommand(isDigest,
+                                                             digestVersion,
+                                                             acceptsTransient,
+                                                             metadata,
+                                                             nowInSec,
+                                                             columnFilter,
+                                                             rowFilter,
+                                                             limits,
+                                                             dataRange,
+                                                             index,
+                                                             trackWarnings);
+        }
+        return new PartitionRangeReadCommand(isDigest,
+                                             digestVersion,
+                                             acceptsTransient,
                                              metadata,
                                              nowInSec,
                                              columnFilter,
                                              rowFilter,
                                              limits,
                                              dataRange,
-                                             findIndex(metadata, rowFilter),
-                                             false);
+                                             index,
+                                             trackWarnings);
+    }
+
+    public static PartitionRangeReadCommand create(TableMetadata metadata,
+                                                   int nowInSec,
+                                                   ColumnFilter columnFilter,
+                                                   RowFilter rowFilter,
+                                                   DataLimits limits,
+                                                   DataRange dataRange)
+    {
+        return create(false,
+                      0,
+                      false,
+                      metadata,
+                      nowInSec,
+                      columnFilter,
+                      rowFilter,
+                      limits,
+                      dataRange,
+                      findIndex(metadata, rowFilter),
+                      false);
     }
 
     /**
@@ -100,17 +141,17 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
      */
     public static PartitionRangeReadCommand allDataRead(TableMetadata metadata, int nowInSec)
     {
-        return new PartitionRangeReadCommand(false,
-                                             0,
-                                             false,
-                                             metadata,
-                                             nowInSec,
-                                             ColumnFilter.all(metadata),
-                                             RowFilter.NONE,
-                                             DataLimits.NONE,
-                                             DataRange.allData(metadata.partitioner),
-                                             null,
-                                             false);
+        return create(false,
+                      0,
+                      false,
+                      metadata,
+                      nowInSec,
+                      ColumnFilter.all(metadata),
+                      RowFilter.NONE,
+                      DataLimits.NONE,
+                      DataRange.allData(metadata.partitioner),
+                      null,
+                      false);
     }
 
     public DataRange dataRange()
@@ -150,96 +191,96 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
         // DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in
         // the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are
         // on the ring.
-        return new PartitionRangeReadCommand(isDigestQuery(),
-                                             digestVersion(),
-                                             acceptsTransient(),
-                                             metadata(),
-                                             nowInSec(),
-                                             columnFilter(),
-                                             rowFilter(),
-                                             isRangeContinuation ? limits() : limits().withoutState(),
-                                             dataRange().forSubRange(range),
-                                             indexMetadata(),
-                                             isTrackingWarnings());
+        return create(isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      isRangeContinuation ? limits() : limits().withoutState(),
+                      dataRange().forSubRange(range),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     public PartitionRangeReadCommand copy()
     {
-        return new PartitionRangeReadCommand(isDigestQuery(),
-                                             digestVersion(),
-                                             acceptsTransient(),
-                                             metadata(),
-                                             nowInSec(),
-                                             columnFilter(),
-                                             rowFilter(),
-                                             limits(),
-                                             dataRange(),
-                                             indexMetadata(),
-                                             isTrackingWarnings());
+        return create(isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits(),
+                      dataRange(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     protected PartitionRangeReadCommand copyAsDigestQuery()
     {
-        return new PartitionRangeReadCommand(true,
-                                             digestVersion(),
-                                             false,
-                                             metadata(),
-                                             nowInSec(),
-                                             columnFilter(),
-                                             rowFilter(),
-                                             limits(),
-                                             dataRange(),
-                                             indexMetadata(),
-                                             isTrackingWarnings());
+        return create(true,
+                      digestVersion(),
+                      false,
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits(),
+                      dataRange(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     protected PartitionRangeReadCommand copyAsTransientQuery()
     {
-        return new PartitionRangeReadCommand(false,
-                                             0,
-                                             true,
-                                             metadata(),
-                                             nowInSec(),
-                                             columnFilter(),
-                                             rowFilter(),
-                                             limits(),
-                                             dataRange(),
-                                             indexMetadata(),
-                                             isTrackingWarnings());
+        return create(false,
+                      0,
+                      true,
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits(),
+                      dataRange(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(),
-                                             digestVersion(),
-                                             acceptsTransient(),
-                                             metadata(),
-                                             nowInSec(),
-                                             columnFilter(),
-                                             rowFilter(),
-                                             newLimits,
-                                             dataRange(),
-                                             indexMetadata(),
-                                             isTrackingWarnings());
+        return create(isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      newLimits,
+                      dataRange(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(),
-                                             digestVersion(),
-                                             acceptsTransient(),
-                                             metadata(),
-                                             nowInSec(),
-                                             columnFilter(),
-                                             rowFilter(),
-                                             newLimits,
-                                             newDataRange,
-                                             indexMetadata(),
-                                             isTrackingWarnings());
+        return create(isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      newLimits,
+                      newDataRange,
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     public long getTimeout(TimeUnit unit)
@@ -449,7 +490,52 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, metadata);
-            return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false);
+            return PartitionRangeReadCommand.create(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false);
+        }
+    }
+
+    public static class VirtualTablePartitionRangeReadCommand extends PartitionRangeReadCommand
+    {
+        private VirtualTablePartitionRangeReadCommand(boolean isDigest,
+                                                      int digestVersion,
+                                                      boolean acceptsTransient,
+                                                      TableMetadata metadata,
+                                                      int nowInSec,
+                                                      ColumnFilter columnFilter,
+                                                      RowFilter rowFilter,
+                                                      DataLimits limits,
+                                                      DataRange dataRange,
+                                                      IndexMetadata index,
+                                                      boolean trackWarnings)
+        {
+            super(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index, trackWarnings);
+        }
+
+        @Override
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+        {
+            return executeInternal(executionController());
+        }
+
+        @Override
+        @SuppressWarnings("resource")
+        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+        {
+            VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+            UnfilteredPartitionIterator resultIterator = view.select(dataRange, columnFilter());
+            return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+        }
+
+        @Override
+        public ReadExecutionController executionController()
+        {
+            return ReadExecutionController.empty();
+        }
+
+        @Override
+        public ReadExecutionController executionController(boolean trackRepairedStatus)
+        {
+            return executionController();
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
index 12624e7..5054f32 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
@@ -38,9 +38,6 @@ public interface PartitionRangeReadQuery extends ReadQuery
                             DataLimits limits,
                             DataRange dataRange)
     {
-        if (table.isVirtual())
-            return VirtualTablePartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
-
         return PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
     }
 
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 7ed092a..9226568 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -94,6 +94,8 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
     private void validateTransientStatus(Message<ReadCommand> message)
     {
         ReadCommand command = message.payload;
+        if (command.metadata().isVirtual())
+            return;
         Token token;
 
         if (command instanceof SinglePartitionReadCommand)
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 475b287..54a2524 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
@@ -35,6 +36,8 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.RTBoundValidator;
 import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
@@ -57,8 +60,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 {
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
-    private final DecoratedKey partitionKey;
-    private final ClusteringIndexFilter clusteringIndexFilter;
+    protected final DecoratedKey partitionKey;
+    protected final ClusteringIndexFilter clusteringIndexFilter;
 
     @VisibleForTesting
     protected SinglePartitionReadCommand(boolean isDigest,
@@ -80,6 +83,48 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         this.clusteringIndexFilter = clusteringIndexFilter;
     }
 
+    private static SinglePartitionReadCommand create(boolean isDigest,
+                                                    int digestVersion,
+                                                    boolean acceptsTransient,
+                                                    TableMetadata metadata,
+                                                    int nowInSec,
+                                                    ColumnFilter columnFilter,
+                                                    RowFilter rowFilter,
+                                                    DataLimits limits,
+                                                    DecoratedKey partitionKey,
+                                                    ClusteringIndexFilter clusteringIndexFilter,
+                                                    IndexMetadata index,
+                                                    boolean trackWarnings)
+    {
+        if (metadata.isVirtual())
+        {
+            return new VirtualTableSinglePartitionReadCommand(isDigest,
+                                                              digestVersion,
+                                                              acceptsTransient,
+                                                              metadata,
+                                                              nowInSec,
+                                                              columnFilter,
+                                                              rowFilter,
+                                                              limits,
+                                                              partitionKey,
+                                                              clusteringIndexFilter,
+                                                              index,
+                                                              trackWarnings);
+        }
+        return new SinglePartitionReadCommand(isDigest,
+                                              digestVersion,
+                                              acceptsTransient,
+                                              metadata,
+                                              nowInSec,
+                                              columnFilter,
+                                              rowFilter,
+                                              limits,
+                                              partitionKey,
+                                              clusteringIndexFilter,
+                                              index,
+                                              trackWarnings);
+    }
+
     /**
      * Creates a new read command on a single partition.
      *
@@ -103,18 +148,18 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                                     ClusteringIndexFilter clusteringIndexFilter,
                                                     IndexMetadata indexMetadata)
     {
-        return new SinglePartitionReadCommand(false,
-                                              0,
-                                              false,
-                                              metadata,
-                                              nowInSec,
-                                              columnFilter,
-                                              rowFilter,
-                                              limits,
-                                              partitionKey,
-                                              clusteringIndexFilter,
-                                              indexMetadata,
-                                              false);
+        return create(false,
+                      0,
+                      false,
+                      metadata,
+                      nowInSec,
+                      columnFilter,
+                      rowFilter,
+                      limits,
+                      partitionKey,
+                      clusteringIndexFilter,
+                      indexMetadata,
+                      false);
     }
 
     /**
@@ -280,69 +325,69 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 
     public SinglePartitionReadCommand copy()
     {
-        return new SinglePartitionReadCommand(isDigestQuery(),
-                                              digestVersion(),
-                                              acceptsTransient(),
-                                              metadata(),
-                                              nowInSec(),
-                                              columnFilter(),
-                                              rowFilter(),
-                                              limits(),
-                                              partitionKey(),
-                                              clusteringIndexFilter(),
-                                              indexMetadata(),
-                                              isTrackingWarnings());
+        return create(isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits(),
+                      partitionKey(),
+                      clusteringIndexFilter(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     protected SinglePartitionReadCommand copyAsDigestQuery()
     {
-        return new SinglePartitionReadCommand(true,
-                                              digestVersion(),
-                                              acceptsTransient(),
-                                              metadata(),
-                                              nowInSec(),
-                                              columnFilter(),
-                                              rowFilter(),
-                                              limits(),
-                                              partitionKey(),
-                                              clusteringIndexFilter(),
-                                              indexMetadata(),
-                                              isTrackingWarnings());
+        return create(true,
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits(),
+                      partitionKey(),
+                      clusteringIndexFilter(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     protected SinglePartitionReadCommand copyAsTransientQuery()
     {
-        return new SinglePartitionReadCommand(false,
-                                              0,
-                                              true,
-                                              metadata(),
-                                              nowInSec(),
-                                              columnFilter(),
-                                              rowFilter(),
-                                              limits(),
-                                              partitionKey(),
-                                              clusteringIndexFilter(),
-                                              indexMetadata(),
-                                              isTrackingWarnings());
+        return create(false,
+                      0,
+                      true,
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits(),
+                      partitionKey(),
+                      clusteringIndexFilter(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
     public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
     {
-        return new SinglePartitionReadCommand(isDigestQuery(),
-                                              digestVersion(),
-                                              acceptsTransient(),
-                                              metadata(),
-                                              nowInSec(),
-                                              columnFilter(),
-                                              rowFilter(),
-                                              newLimits,
-                                              partitionKey(),
-                                              clusteringIndexFilter(),
-                                              indexMetadata(),
-                                              isTrackingWarnings());
+        return create(isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      newLimits,
+                      partitionKey(),
+                      clusteringIndexFilter(),
+                      indexMetadata(),
+                      isTrackingWarnings());
     }
 
     @Override
@@ -1156,17 +1201,24 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                                                clusteringIndexFilter));
             }
 
-            return new Group(commands, limits);
+            return create(commands, limits);
         }
 
-        public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
+        private Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
         {
             super(commands, limits);
         }
 
         public static Group one(SinglePartitionReadCommand command)
         {
-            return new Group(Collections.singletonList(command), command.limits());
+            return create(Collections.singletonList(command), command.limits());
+        }
+
+        public static Group create(List<SinglePartitionReadCommand> commands, DataLimits limits)
+        {
+            return commands.get(0).metadata().isVirtual() ?
+                   new VirtualTableGroup(commands, limits) :
+                   new Group(commands, limits);
         }
 
         public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
@@ -1175,6 +1227,25 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         }
     }
 
+    public static class VirtualTableGroup extends Group
+    {
+        public VirtualTableGroup(List<SinglePartitionReadCommand> commands, DataLimits limits)
+        {
+            super(commands, limits);
+        }
+
+        @Override
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+        {
+            if (queries.size() == 1)
+                return queries.get(0).execute(consistency, clientState, queryStartNanoTime);
+
+            return PartitionIterators.concat(queries.stream()
+                                                    .map(q -> q.execute(consistency, clientState, queryStartNanoTime))
+                                                    .collect(Collectors.toList()));
+        }
+    }
+
     private static class Deserializer extends SelectionDeserializer
     {
         public ReadCommand deserialize(DataInputPlus in,
@@ -1192,7 +1263,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         {
             DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readBuffer(in, DatabaseDescriptor.getMaxValueSize()));
             ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
-            return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false);
+            return SinglePartitionReadCommand.create(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false);
         }
     }
 
@@ -1223,4 +1294,50 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
             return mergedSSTables;
         }
     }
+
+    public static class VirtualTableSinglePartitionReadCommand extends SinglePartitionReadCommand
+    {
+        protected VirtualTableSinglePartitionReadCommand(boolean isDigest,
+                                                         int digestVersion,
+                                                         boolean acceptsTransient,
+                                                         TableMetadata metadata,
+                                                         int nowInSec,
+                                                         ColumnFilter columnFilter,
+                                                         RowFilter rowFilter,
+                                                         DataLimits limits,
+                                                         DecoratedKey partitionKey,
+                                                         ClusteringIndexFilter clusteringIndexFilter,
+                                                         IndexMetadata index,
+                                                         boolean trackWarnings)
+        {
+            super(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter, index, trackWarnings);
+        }
+
+        @Override
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+        {
+            return executeInternal(executionController());
+        }
+
+        @Override
+        @SuppressWarnings("resource")
+        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+        {
+            VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+            UnfilteredPartitionIterator resultIterator = view.select(partitionKey, clusteringIndexFilter, columnFilter());
+            return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+        }
+
+        @Override
+        public ReadExecutionController executionController()
+        {
+            return ReadExecutionController.empty();
+        }
+
+        @Override
+        public ReadExecutionController executionController(boolean trackRepairedStatus)
+        {
+            return executionController();
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
index 5d344de..e595fcb 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -53,9 +53,7 @@ public interface SinglePartitionReadQuery extends ReadQuery
                                                                         List<DecoratedKey> partitionKeys,
                                                                         ClusteringIndexFilter clusteringIndexFilter)
     {
-        return metadata.isVirtual()
-             ? VirtualTableSinglePartitionReadQuery.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter)
-             : SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
+        return SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
     }
 
 
@@ -100,9 +98,7 @@ public interface SinglePartitionReadQuery extends ReadQuery
                                                   DecoratedKey partitionKey,
                                                   ClusteringIndexFilter clusteringIndexFilter)
     {
-        return metadata.isVirtual()
-             ? VirtualTableSinglePartitionReadQuery.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter)
-             : SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
deleted file mode 100644
index b24a670..0000000
--- a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
-import org.apache.cassandra.db.virtual.VirtualTable;
-import org.apache.cassandra.schema.TableMetadata;
-
-/**
- * A read query that selects a (part of a) range of partitions of a virtual table.
- */
-public class VirtualTablePartitionRangeReadQuery extends VirtualTableReadQuery implements PartitionRangeReadQuery
-{
-    private final DataRange dataRange;
-
-    public static VirtualTablePartitionRangeReadQuery create(TableMetadata metadata,
-                                                             int nowInSec,
-                                                             ColumnFilter columnFilter,
-                                                             RowFilter rowFilter,
-                                                             DataLimits limits,
-                                                             DataRange dataRange)
-    {
-        return new VirtualTablePartitionRangeReadQuery(metadata,
-                                                       nowInSec,
-                                                       columnFilter,
-                                                       rowFilter,
-                                                       limits,
-                                                       dataRange);
-    }
-
-    private VirtualTablePartitionRangeReadQuery(TableMetadata metadata,
-                                                int nowInSec,
-                                                ColumnFilter columnFilter,
-                                                RowFilter rowFilter,
-                                                DataLimits limits,
-                                                DataRange dataRange)
-    {
-        super(metadata, nowInSec, columnFilter, rowFilter, limits);
-        this.dataRange = dataRange;
-    }
-
-    @Override
-    public DataRange dataRange()
-    {
-        return dataRange;
-    }
-
-    @Override
-    public PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits)
-    {
-        return new VirtualTablePartitionRangeReadQuery(metadata(),
-                                                       nowInSec(),
-                                                       columnFilter(),
-                                                       rowFilter(),
-                                                       newLimits,
-                                                       dataRange());
-    }
-
-    @Override
-    public PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
-    {
-        return new VirtualTablePartitionRangeReadQuery(metadata(),
-                                                       nowInSec(),
-                                                       columnFilter(),
-                                                       rowFilter(),
-                                                       newLimits,
-                                                       newDataRange);
-    }
-
-    @Override
-    protected UnfilteredPartitionIterator queryVirtualTable()
-    {
-        VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
-        return view.select(dataRange, columnFilter());
-    }
-
-    @Override
-    protected void appendCQLWhereClause(StringBuilder sb)
-    {
-        String filterString = dataRange.toCQLString(metadata(), rowFilter());
-        if (!filterString.isEmpty())
-            sb.append(" WHERE ").append(filterString);
-    }
-}
diff --git a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
deleted file mode 100644
index ad22a58..0000000
--- a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.ClientState;
-
-/**
- * Base class for the {@code ReadQuery} implementations use to query virtual tables.
- */
-public abstract class VirtualTableReadQuery extends AbstractReadQuery
-{
-    protected VirtualTableReadQuery(TableMetadata metadata,
-                                    int nowInSec,
-                                    ColumnFilter columnFilter,
-                                    RowFilter rowFilter,
-                                    DataLimits limits)
-    {
-        super(metadata, nowInSec, columnFilter, rowFilter, limits);
-    }
-
-    @Override
-    public ReadExecutionController executionController()
-    {
-        return ReadExecutionController.empty();
-    }
-
-    @Override
-    public PartitionIterator execute(ConsistencyLevel consistency,
-                                     ClientState clientState,
-                                     long queryStartNanoTime) throws RequestExecutionException
-    {
-        return executeInternal(executionController());
-    }
-
-    @Override
-    @SuppressWarnings("resource")
-    public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
-    {
-        UnfilteredPartitionIterator resultIterator = queryVirtualTable();
-        return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
-    }
-
-    protected abstract UnfilteredPartitionIterator queryVirtualTable();
-}
diff --git a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
deleted file mode 100644
index 32346e8..0000000
--- a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.cassandra.db.filter.ClusteringIndexFilter;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
-import org.apache.cassandra.db.virtual.VirtualTable;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.ClientState;
-
-/**
- * A read query that selects a (part of a) single partition of a virtual table.
- */
-public class VirtualTableSinglePartitionReadQuery extends VirtualTableReadQuery implements SinglePartitionReadQuery
-{
-    private final DecoratedKey partitionKey;
-    private final ClusteringIndexFilter clusteringIndexFilter;
-
-    public static VirtualTableSinglePartitionReadQuery create(TableMetadata metadata,
-                                                              int nowInSec,
-                                                              ColumnFilter columnFilter,
-                                                              RowFilter rowFilter,
-                                                              DataLimits limits,
-                                                              DecoratedKey partitionKey,
-                                                              ClusteringIndexFilter clusteringIndexFilter)
-    {
-        return new VirtualTableSinglePartitionReadQuery(metadata,
-                                                        nowInSec,
-                                                        columnFilter,
-                                                        rowFilter,
-                                                        limits,
-                                                        partitionKey,
-                                                        clusteringIndexFilter);
-    }
-
-    private VirtualTableSinglePartitionReadQuery(TableMetadata metadata,
-                                                 int nowInSec,
-                                                 ColumnFilter columnFilter,
-                                                 RowFilter rowFilter,
-                                                 DataLimits limits,
-                                                 DecoratedKey partitionKey,
-                                                 ClusteringIndexFilter clusteringIndexFilter)
-    {
-        super(metadata, nowInSec, columnFilter, rowFilter, limits);
-        this.partitionKey = partitionKey;
-        this.clusteringIndexFilter = clusteringIndexFilter;
-    }
-
-    @Override
-    protected void appendCQLWhereClause(StringBuilder sb)
-    {
-        sb.append(" WHERE ").append(partitionKey().toCQLString(metadata()));
-
-        String filterString = clusteringIndexFilter().toCQLString(metadata(), rowFilter());
-        if (!filterString.isEmpty())
-        {
-            if (!clusteringIndexFilter().selectsAllPartition() || !rowFilter().isEmpty())
-                sb.append(" AND ");
-            sb.append(filterString);
-        }
-    }
-
-    @Override
-    public ClusteringIndexFilter clusteringIndexFilter()
-    {
-        return clusteringIndexFilter;
-    }
-
-    @Override
-    public boolean selectsFullPartition()
-    {
-        return clusteringIndexFilter.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns();
-    }
-
-    @Override
-    public DecoratedKey partitionKey()
-    {
-        return partitionKey;
-    }
-
-    @Override
-    public SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits)
-    {
-        return new VirtualTableSinglePartitionReadQuery(metadata(),
-                                                        nowInSec(),
-                                                        columnFilter(),
-                                                        rowFilter(),
-                                                        newLimits,
-                                                        partitionKey(),
-                                                        clusteringIndexFilter);
-    }
-
-    @Override
-    public SinglePartitionReadQuery forPaging(Clustering<?> lastReturned, DataLimits limits)
-    {
-        return new VirtualTableSinglePartitionReadQuery(metadata(),
-                                                        nowInSec(),
-                                                        columnFilter(),
-                                                        rowFilter(),
-                                                        limits,
-                                                        partitionKey(),
-                                                      lastReturned == null ? clusteringIndexFilter
-                                                              : clusteringIndexFilter.forPaging(metadata().comparator,
-                                                                                                lastReturned,
-                                                                                                false));
-    }
-
-    @Override
-    protected UnfilteredPartitionIterator queryVirtualTable()
-    {
-        VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
-        return view.select(partitionKey, clusteringIndexFilter, columnFilter());
-    }
-
-    /**
-     * Groups multiple single partition read queries.
-     */
-    public static class Group extends SinglePartitionReadQuery.Group<VirtualTableSinglePartitionReadQuery>
-    {
-        public static Group create(TableMetadata metadata,
-                                   int nowInSec,
-                                   ColumnFilter columnFilter,
-                                   RowFilter rowFilter,
-                                   DataLimits limits,
-                                   List<DecoratedKey> partitionKeys,
-                                   ClusteringIndexFilter clusteringIndexFilter)
-        {
-            List<VirtualTableSinglePartitionReadQuery> queries = new ArrayList<>(partitionKeys.size());
-            for (DecoratedKey partitionKey : partitionKeys)
-            {
-                queries.add(VirtualTableSinglePartitionReadQuery.create(metadata,
-                                                                        nowInSec,
-                                                                        columnFilter,
-                                                                        rowFilter,
-                                                                        limits,
-                                                                        partitionKey,
-                                                                        clusteringIndexFilter));
-            }
-
-            return new Group(queries, limits);
-        }
-
-        public Group(List<VirtualTableSinglePartitionReadQuery> queries, DataLimits limits)
-        {
-            super(queries, limits);
-        }
-
-        public static Group one(VirtualTableSinglePartitionReadQuery query)
-        {
-            return new Group(Collections.singletonList(query), query.limits());
-        }
-
-        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
-        {
-            if (queries.size() == 1)
-                return queries.get(0).execute(consistency, clientState, queryStartNanoTime);
-
-            return PartitionIterators.concat(queries.stream()
-                                                    .map(q -> q.execute(consistency, clientState, queryStartNanoTime))
-                                                    .collect(Collectors.toList()));
-        }
-    }
-}
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
index 5e0f90c..23814cd 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
@@ -40,7 +40,10 @@ public final class VirtualKeyspaceRegistry
 
     public void register(VirtualKeyspace keyspace)
     {
-        virtualKeyspaces.put(keyspace.name(), keyspace);
+        VirtualKeyspace previous = virtualKeyspaces.put(keyspace.name(), keyspace);
+        // some tests choose to replace the keyspace, if so make sure to cleanup tables as well
+        if (previous != null)
+            previous.tables().forEach(t -> virtualTables.remove(t));
         keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t));
     }
 
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 168601c..c7c6df0 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -152,7 +152,7 @@ public class LocalPartitioner implements IPartitioner
         @Override
         public int compareTo(Token o)
         {
-            assert getPartitioner() == o.getPartitioner();
+            assert getPartitioner() == o.getPartitioner() : String.format("partitioners do not match; %s != %s", getPartitioner(), o.getPartitioner());
             return comparator.compare(token, ((LocalToken) o).token);
         }
 
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 09e1817..e2e3104 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -264,6 +266,55 @@ public final class MessagingService extends MessagingServiceMBeanImpl
         OutboundConnections.scheduleUnusedConnectionMonitoring(this, ScheduledExecutors.scheduledTasks, 1L, TimeUnit.HOURS);
     }
 
+    public <T> org.apache.cassandra.utils.concurrent.Future<Message<T>> sendWithResult(Message message, InetAddressAndPort to)
+    {
+        AsyncPromise<Message<T>> promise = new AsyncPromise<>();
+        MessagingService.instance().sendWithCallback(message, to, new RequestCallback<T>()
+        {
+            @Override
+            public void onResponse(Message<T> msg)
+            {
+                promise.trySuccess(msg);
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
+            {
+                promise.tryFailure(new FailureResponseException(from, failureReason));
+            }
+
+            @Override
+            public boolean invokeOnFailure()
+            {
+                return true;
+            }
+        });
+        return promise;
+    }
+
+    public static class FailureResponseException extends IOException
+    {
+        private final InetAddressAndPort from;
+        private final RequestFailureReason failureReason;
+
+        public FailureResponseException(InetAddressAndPort from, RequestFailureReason failureReason)
+        {
+            super(String.format("Failure from %s: %s", from, failureReason.name()));
+            this.from = from;
+            this.failureReason = failureReason;
+        }
+
+        public InetAddressAndPort from()
+        {
+            return from;
+        }
+
+        public RequestFailureReason failureReason()
+        {
+            return failureReason;
+        }
+    }
+
     /**
      * Send a non-mutation message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index 5cae3b9..9cddedf 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.Digest;
 
@@ -50,6 +51,10 @@ public final class SchemaConstants
     public static final Set<String> LOCAL_SYSTEM_KEYSPACE_NAMES =
         ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
 
+    /* virtual table system keyspace names */
+    public static final Set<String> VIRTUAL_SYSTEM_KEYSPACE_NAMES =
+        ImmutableSet.of(VIRTUAL_VIEWS, VIRTUAL_SCHEMA);
+
     /* replicate system keyspace names (the ones with a "true" replication strategy) */
     public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES =
         ImmutableSet.of(TRACE_KEYSPACE_NAME, AUTH_KEYSPACE_NAME, DISTRIBUTED_KEYSPACE_NAME);
@@ -98,7 +103,7 @@ public final class SchemaConstants
      */
     public static boolean isVirtualSystemKeyspace(String keyspaceName)
     {
-        return VIRTUAL_SCHEMA.equals(keyspaceName.toLowerCase()) || VIRTUAL_VIEWS.equals(keyspaceName.toLowerCase());
+        return VIRTUAL_SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
     }
 
     /**
@@ -111,4 +116,13 @@ public final class SchemaConstants
                 || isReplicatedSystemKeyspace(keyspaceName)
                 || isVirtualSystemKeyspace(keyspaceName);
     }
+
+    /**
+     * Returns the set of all system keyspaces
+     * @return all system keyspaces
+     */
+    public static Set<String> getSystemKeyspaces()
+    {
+        return Sets.union(Sets.union(LOCAL_SYSTEM_KEYSPACE_NAMES, REPLICATED_SYSTEM_KEYSPACE_NAMES), VIRTUAL_SYSTEM_KEYSPACE_NAMES);
+    }
 }
diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java
index 0633105..8645dc2 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -67,7 +67,7 @@ public class TableId
      */
     public static TableId forSystemTable(String keyspace, String table)
     {
-        assert SchemaConstants.isLocalSystemKeyspace(keyspace) || SchemaConstants.isReplicatedSystemKeyspace(keyspace);
+        assert SchemaConstants.isSystemKeyspace(keyspace) : String.format("Table %s.%s is not a system table; only keyspaces allowed are %s", keyspace, table, SchemaConstants.getSystemKeyspaces());
         return unsafeDeterministic(keyspace, table);
     }
 
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index fcd1d58..ef43c5d 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -740,7 +740,9 @@ public class TableMetadata implements SchemaElement
 
             if (id == null)
             {
-                if (DatabaseDescriptor.useDeterministicTableID()) id = TableId.unsafeDeterministic(keyspace, name);
+                // make sure vtables use determiniestic ids so they can be referenced in calls cross-nodes
+                // see CASSANDRA-17295
+                if (DatabaseDescriptor.useDeterministicTableID() || kind == Kind.VIRTUAL) id = TableId.unsafeDeterministic(keyspace, name);
                 else id = TableId.generate();
             }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableFromInternode.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableFromInternode.java
new file mode 100644
index 0000000..e322585
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableFromInternode.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class VirtualTableFromInternode extends TestBaseImpl
+{
+    private static Cluster CLUSTER;
+
+    @BeforeClass
+    public static void setup() throws IOException
+    {
+        CLUSTER = Cluster.build(2)
+                         .withConfig(c -> c.with(Feature.values()))
+                         .start();
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        if (CLUSTER != null)
+            CLUSTER.close();
+    }
+
+    @Test
+    public void normal()
+    {
+        assertThat(CLUSTER.coordinator(1).executeWithResult("SELECT * FROM system_views.settings", ConsistencyLevel.ONE))
+        .hasSizeGreaterThan(2)
+        .contains("rpc_address", "127.0.0.1")
+        .contains("broadcast_address", "127.0.0.1");
+
+        assertThat(CLUSTER.coordinator(1).executeWithResult("SELECT * FROM system_views.settings WHERE name=?", ConsistencyLevel.ONE, "rpc_address"))
+        .isEqualTo("rpc_address", "127.0.0.1");
+
+        assertThat(CLUSTER.coordinator(1).executeWithResult("SELECT * FROM system_views.settings WHERE name IN (?, ?)", ConsistencyLevel.ONE, "rpc_address", "broadcast_address"))
+        .contains("rpc_address", "127.0.0.1")
+        .contains("broadcast_address", "127.0.0.1")
+        .hasSize(2);
+    }
+
+    @Test
+    public void readCommandAccessVirtualTable()
+    {
+        CLUSTER.get(1).runOnInstance(() -> {
+            boolean didWork = false;
+            for (InetAddressAndPort address : Gossiper.instance.getLiveMembers())
+            {
+                didWork = true;
+                UntypedResultSet rs = QueryProcessor.executeAsync(address, "SELECT * FROM system_views.settings")
+                                                    .syncUninterruptibly().getNow();
+                assertThat(rs.isEmpty()).isFalse();
+                for (UntypedResultSet.Row row : rs)
+                {
+                    String name = row.getString("name");
+                    switch (name)
+                    {
+                        case "broadcast_address":
+                        case "rpc_address":
+                            assertThat(row.getString("value")).isEqualTo(address.getAddress().getHostAddress());
+                            break;
+                    }
+                }
+            }
+            assertThat(didWork).isTrue();
+        });
+    }
+
+    @Test
+    public void readCommandAccessVirtualTableSinglePartition()
+    {
+        CLUSTER.get(1).runOnInstance(() -> {
+            boolean didWork = false;
+            for (InetAddressAndPort address : Gossiper.instance.getLiveMembers())
+            {
+                didWork = true;
+                UntypedResultSet rs = QueryProcessor.executeAsync(address, "SELECT * FROM system_views.settings WHERE name=?", "rpc_address")
+                                                    .syncUninterruptibly().getNow();
+                assertThat(rs.isEmpty()).isFalse();
+                assertThat(rs.one().getString("value")).isEqualTo(address.getAddress().getHostAddress());
+            }
+            assertThat(didWork).isTrue();
+        });
+    }
+
+    @Test
+    public void readCommandAccessVirtualTableMultiplePartition()
+    {
+        CLUSTER.get(1).runOnInstance(() -> {
+            boolean didWork = false;
+            for (InetAddressAndPort address : Gossiper.instance.getLiveMembers())
+            {
+                didWork = true;
+                UntypedResultSet rs = QueryProcessor.executeAsync(address, "SELECT * FROM system_views.settings WHERE name IN (?, ?)", "rpc_address", "broadcast_address")
+                                                    .syncUninterruptibly().getNow();
+                assertThat(rs.isEmpty()).isFalse();
+                Set<String> columns = new HashSet<>();
+                for (UntypedResultSet.Row row : rs)
+                {
+                    columns.add(row.getString("name"));
+                    assertThat(row.getString("value")).isEqualTo(address.getAddress().getHostAddress());
+                }
+                assertThat(columns).isEqualTo(ImmutableSet.of("rpc_address", "broadcast_address"));
+            }
+            assertThat(didWork).isTrue();
+        });
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
new file mode 100644
index 0000000..f9c1d90
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.util;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.assertj.core.api.Assertions;
+
+public class QueryResultUtil
+{
+    private QueryResultUtil()
+    {
+    }
+
+    public static boolean contains(SimpleQueryResult qr, Object... values)
+    {
+        return contains(qr, a -> equals(a, values));
+    }
+
+    public static boolean contains(SimpleQueryResult qr, Row row)
+    {
+        return contains(qr, a -> equals(a, row));
+    }
+
+    public static boolean contains(SimpleQueryResult qr, Predicate<Row> fn)
+    {
+        while (qr.hasNext())
+        {
+            if (fn.test(qr.next()))
+                return true;
+        }
+        return false;
+    }
+
+    private static boolean equals(Row a, Row b)
+    {
+        return equals(a, b.toObjectArray());
+    }
+
+    private static boolean equals(Row a, Object[] bs)
+    {
+        Object[] as = a.toObjectArray();
+        if (as.length != bs.length)
+            return false;
+        for (int i = 0; i < as.length; i++)
+        {
+            if (!Objects.equals(as[i], bs[i]))
+                return false;
+        }
+        return true;
+    }
+
+    public static AssertHelper assertThat(SimpleQueryResult qr)
+    {
+        return new AssertHelper(qr);
+    }
+
+    public static class AssertHelper
+    {
+        private final SimpleQueryResult qr;
+
+        private AssertHelper(SimpleQueryResult qr)
+        {
+            this.qr = qr;
+        }
+
+        public AssertHelper contains(Object... values)
+        {
+            qr.reset();
+            if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, values)))
+                throw new AssertionError("Row " + Arrays.asList(values) + " is not present");
+            return this;
+        }
+
+        public AssertHelper contains(Row row)
+        {
+            qr.reset();
+            if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, row)))
+                throw new AssertionError("Row " + row + " is not present");
+            return this;
+        }
+
+        public AssertHelper contains(Predicate<Row> fn)
+        {
+            qr.reset();
+            if (!QueryResultUtil.contains(qr, fn))
+                throw new AssertionError("Row  is not present");
+            return this;
+        }
+
+        public AssertHelper isEqualTo(Object... values)
+        {
+            Assertions.assertThat(qr.toObjectArrays())
+                      .hasSize(1)
+                      .contains(values);
+            return this;
+        }
+
+        public AssertHelper hasSize(int size)
+        {
+            Assertions.assertThat(qr.toObjectArrays()).hasSize(size);
+            return this;
+        }
+
+        public AssertHelper hasSizeGreaterThan(int size)
+        {
+            Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size);
+            return this;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java b/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
index 4d0adea..11582b9 100644
--- a/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
+++ b/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
@@ -780,11 +780,6 @@ public class AbstractReadQueryToCQLStringTest extends CQLTester
             SinglePartitionReadCommand.Group group = (SinglePartitionReadCommand.Group) readQuery;
             return group.queries.stream().map(AbstractReadQuery::toCQLString).collect(Collectors.toList());
         }
-        else if (readQuery instanceof VirtualTableSinglePartitionReadQuery.Group)
-        {
-            VirtualTableSinglePartitionReadQuery.Group group = (VirtualTableSinglePartitionReadQuery.Group) readQuery;
-            return group.queries.stream().map(AbstractReadQuery::toCQLString).collect(Collectors.toList());
-        }
         else
         {
             assertTrue(readQuery instanceof AbstractReadQuery);
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 69b8c37..289014f 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -358,7 +358,7 @@ public class ReadCommandTest
 
             cfs.forceBlockingFlush();
 
-            ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+            ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
 
             try (ReadExecutionController executionController = query.executionController();
                  UnfilteredPartitionIterator iter = query.executeLocally(executionController);
@@ -528,7 +528,7 @@ public class ReadCommandTest
 
             cfs.forceBlockingFlush();
 
-            ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+            ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
 
             try (ReadExecutionController executionController = query.executionController();
                     UnfilteredPartitionIterator iter = query.executeLocally(executionController);
@@ -604,7 +604,7 @@ public class ReadCommandTest
 
             cfs.forceBlockingFlush();
 
-            ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+            ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
 
             try (ReadExecutionController executionController = query.executionController();
                     UnfilteredPartitionIterator iter = query.executeLocally(executionController);
diff --git a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
index ad28c13..cdb14ab 100644
--- a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
@@ -68,6 +68,7 @@ public class CredentialsCacheKeysTableTest extends CQLTester
 
         // ensure nothing keeps cached between tests
         passwordAuthenticator.getCredentialsCache().invalidate();
+        disablePreparedReuseForTest();
     }
 
     @AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java
index f670fbd..363dfd7 100644
--- a/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java
@@ -89,6 +89,7 @@ public class JmxPermissionsCacheKeysTableTest extends CQLTester
 
         // ensure nothing keeps cached between tests
         AuthorizationProxy.jmxPermissionsCache.invalidate();
+        disablePreparedReuseForTest();
     }
 
     @AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
index 8f7f82b..e329d88 100644
--- a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
@@ -69,6 +69,7 @@ public class NetworkPermissionsCacheKeysTableTest extends CQLTester
 
         // ensure nothing keeps cached between tests
         AuthenticatedUser.networkPermissionsCache.invalidate();
+        disablePreparedReuseForTest();
     }
 
     @AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
index 1bfbb0b..97406e4 100644
--- a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
@@ -84,6 +84,7 @@ public class PermissionsCacheKeysTableTest extends CQLTester
 
         // ensure nothing keeps cached between tests
         AuthenticatedUser.permissionsCache.invalidate();
+        disablePreparedReuseForTest();
     }
 
     @AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
index 6e5131d..ce8beea 100644
--- a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
@@ -69,6 +69,7 @@ public class RolesCacheKeysTableTest extends CQLTester
 
         // ensure nothing keeps cached between tests
         Roles.cache.invalidate();
+        disablePreparedReuseForTest();
     }
 
     @AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java
index b21b3c4..45da7e2 100644
--- a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java
@@ -56,6 +56,7 @@ public class SSTableTasksTableTest extends CQLTester
     {
         table = new SSTableTasksTable(KS_NAME);
         VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
+        disablePreparedReuseForTest();
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java
index a2fda49..8239cef 100644
--- a/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java
@@ -59,6 +59,7 @@ public class SettingsTableTest extends CQLTester
         config.server_encryption_options.applyConfig();
         table = new SettingsTable(KS_NAME, config);
         VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
+        disablePreparedReuseForTest();
     }
 
     private String getValue(Field f)
diff --git a/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java
index 2ec0683..5242d55 100644
--- a/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java
@@ -56,6 +56,7 @@ public class SystemPropertiesTableTest extends CQLTester
     {
         table = new SystemPropertiesTable(KS_NAME);
         VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
+        disablePreparedReuseForTest();
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index 50bb3e2..323de1a 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -326,7 +326,7 @@ public class QueryPagerTest
 
     public void multiQueryTest(boolean testPagingState, ProtocolVersion protocolVersion)
     {
-        ReadQuery command = new SinglePartitionReadCommand.Group(new ArrayList<SinglePartitionReadCommand>()
+        ReadQuery command = SinglePartitionReadCommand.Group.create(new ArrayList<SinglePartitionReadCommand>()
         {{
             add(sliceQuery("k1", "c2", "c6", 10));
             add(sliceQuery("k4", "c3", "c5", 10));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org