You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/02/09 01:09:15 UTC
[cassandra] branch trunk updated: Make vtables accessible via internode messaging
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f0c9713 Make vtables accessible via internode messaging
f0c9713 is described below
commit f0c97132c969cc8dd028d00c5e68ada8b0b9b9c6
Author: David Capwell <dc...@apache.org>
AuthorDate: Tue Feb 8 13:37:12 2022 -0800
Make vtables accessible via internode messaging
patch by David Capwell; reviewed by Aleksey Yeschenko for CASSANDRA-17295
---
CHANGES.txt | 1 +
.../org/apache/cassandra/cql3/QueryProcessor.java | 63 +++++
.../cql3/statements/ModificationStatement.java | 2 +-
.../cassandra/cql3/statements/SelectStatement.java | 2 +-
.../cassandra/db/PartitionRangeReadCommand.java | 266 ++++++++++++++-------
.../cassandra/db/PartitionRangeReadQuery.java | 3 -
.../cassandra/db/ReadCommandVerbHandler.java | 2 +
.../cassandra/db/SinglePartitionReadCommand.java | 249 ++++++++++++++-----
.../cassandra/db/SinglePartitionReadQuery.java | 8 +-
.../db/VirtualTablePartitionRangeReadQuery.java | 103 --------
.../apache/cassandra/db/VirtualTableReadQuery.java | 66 -----
.../db/VirtualTableSinglePartitionReadQuery.java | 190 ---------------
.../db/virtual/VirtualKeyspaceRegistry.java | 5 +-
.../org/apache/cassandra/dht/LocalPartitioner.java | 2 +-
.../org/apache/cassandra/net/MessagingService.java | 51 ++++
.../apache/cassandra/schema/SchemaConstants.java | 16 +-
src/java/org/apache/cassandra/schema/TableId.java | 2 +-
.../org/apache/cassandra/schema/TableMetadata.java | 4 +-
.../test/VirtualTableFromInternode.java | 142 +++++++++++
.../distributed/util/QueryResultUtil.java | 130 ++++++++++
.../db/AbstractReadQueryToCQLStringTest.java | 5 -
.../org/apache/cassandra/db/ReadCommandTest.java | 6 +-
.../db/virtual/CredentialsCacheKeysTableTest.java | 1 +
.../virtual/JmxPermissionsCacheKeysTableTest.java | 1 +
.../NetworkPermissionsCacheKeysTableTest.java | 1 +
.../db/virtual/PermissionsCacheKeysTableTest.java | 1 +
.../db/virtual/RolesCacheKeysTableTest.java | 1 +
.../db/virtual/SSTableTasksTableTest.java | 1 +
.../cassandra/db/virtual/SettingsTableTest.java | 1 +
.../db/virtual/SystemPropertiesTableTest.java | 1 +
.../apache/cassandra/service/QueryPagerTest.java | 2 +-
31 files changed, 788 insertions(+), 540 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index bf0de22..f8653d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Make vtables accessible via internode messaging (CASSANDRA-17295)
* Add support for PEM based key material for SSL (CASSANDRA-17031)
* Standardize storage configuration parameters' names. Support unit suffixes. (CASSANDRA-15234)
* Remove support for Windows (CASSANDRA-16956)
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index c996d0d..1b8e94d 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -37,8 +38,13 @@ import org.antlr.runtime.*;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.ClientRequestMetrics;
import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaChangeListener;
import org.apache.cassandra.schema.SchemaConstants;
@@ -59,6 +65,8 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
import static org.apache.cassandra.config.CassandraRelevantProperties.ENABLE_NODELOCAL_QUERIES;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
@@ -428,6 +436,61 @@ public class QueryProcessor implements QueryHandler
return null;
}
+ public static Future<UntypedResultSet> executeAsync(InetAddressAndPort address, String query, Object... values)
+ {
+ Prepared prepared = prepareInternal(query);
+ QueryOptions options = makeInternalOptions(prepared.statement, values);
+ if (prepared.statement instanceof SelectStatement)
+ {
+ SelectStatement select = (SelectStatement) prepared.statement;
+ int nowInSec = FBUtilities.nowInSeconds();
+ ReadQuery readQuery = select.getQuery(options, nowInSec);
+ List<ReadCommand> commands;
+ if (readQuery instanceof ReadCommand)
+ {
+ commands = Collections.singletonList((ReadCommand) readQuery);
+ }
+ else if (readQuery instanceof SinglePartitionReadQuery.Group)
+ {
+ List<? extends SinglePartitionReadQuery> queries = ((SinglePartitionReadQuery.Group<? extends SinglePartitionReadQuery>) readQuery).queries;
+ queries.forEach(a -> {
+ if (!(a instanceof ReadCommand))
+ throw new IllegalArgumentException("Queries found which are not ReadCommand: " + a.getClass());
+ });
+ commands = (List<ReadCommand>) (List<?>) queries;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unable to handle; only expected ReadCommands but given " + readQuery.getClass());
+ }
+ Future<List<Message<ReadResponse>>> future = FutureCombiner.allOf(commands.stream()
+ .map(rc -> Message.out(rc.verb(), rc))
+ .map(m -> MessagingService.instance().<ReadResponse>sendWithResult(m, address))
+ .collect(Collectors.toList()));
+
+ ResultSetBuilder result = new ResultSetBuilder(select.getResultMetadata(), select.getSelection().newSelectors(options), null);
+ return future.map(list -> {
+ int i = 0;
+ for (Message<ReadResponse> m : list)
+ {
+ ReadResponse rsp = m.payload;
+ try (PartitionIterator it = UnfilteredPartitionIterators.filter(rsp.makeIterator(commands.get(i++)), nowInSec))
+ {
+ while (it.hasNext())
+ {
+ try (RowIterator partition = it.next())
+ {
+ select.processPartition(partition, options, result, nowInSec);
+ }
+ }
+ }
+ }
+ return result.build();
+ }).map(UntypedResultSet::create);
+ }
+ throw new IllegalArgumentException("Unable to execute query; only SELECT supported but given: " + query);
+ }
+
public static UntypedResultSet execute(String query, ConsistencyLevel cl, Object... values)
throws RequestExecutionException
{
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index eed528f..4233d23 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -410,7 +410,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
metadata().partitioner.decorateKey(key),
filter));
- SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+ SinglePartitionReadCommand.Group group = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
if (local)
{
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7a710cc..cd0a76b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -869,7 +869,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
}
// Used by ModificationStatement for CAS operations
- void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec)
+ public void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec)
throws InvalidRequestException
{
maybeFail(result, options);
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index e69c288..ef12853 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -52,7 +54,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
- private final DataRange dataRange;
+ protected final DataRange dataRange;
private PartitionRangeReadCommand(boolean isDigest,
int digestVersion,
@@ -70,24 +72,63 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
this.dataRange = dataRange;
}
- public static PartitionRangeReadCommand create(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DataRange dataRange)
+ private static PartitionRangeReadCommand create(boolean isDigest,
+ int digestVersion,
+ boolean acceptsTransient,
+ TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange,
+ IndexMetadata index,
+ boolean trackWarnings)
{
- return new PartitionRangeReadCommand(false,
- 0,
- false,
+ if (metadata.isVirtual())
+ {
+ return new VirtualTablePartitionRangeReadCommand(isDigest,
+ digestVersion,
+ acceptsTransient,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ dataRange,
+ index,
+ trackWarnings);
+ }
+ return new PartitionRangeReadCommand(isDigest,
+ digestVersion,
+ acceptsTransient,
metadata,
nowInSec,
columnFilter,
rowFilter,
limits,
dataRange,
- findIndex(metadata, rowFilter),
- false);
+ index,
+ trackWarnings);
+ }
+
+ public static PartitionRangeReadCommand create(TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
+ {
+ return create(false,
+ 0,
+ false,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ dataRange,
+ findIndex(metadata, rowFilter),
+ false);
}
/**
@@ -100,17 +141,17 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
*/
public static PartitionRangeReadCommand allDataRead(TableMetadata metadata, int nowInSec)
{
- return new PartitionRangeReadCommand(false,
- 0,
- false,
- metadata,
- nowInSec,
- ColumnFilter.all(metadata),
- RowFilter.NONE,
- DataLimits.NONE,
- DataRange.allData(metadata.partitioner),
- null,
- false);
+ return create(false,
+ 0,
+ false,
+ metadata,
+ nowInSec,
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ DataRange.allData(metadata.partitioner),
+ null,
+ false);
}
public DataRange dataRange()
@@ -150,96 +191,96 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
// DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in
// the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are
// on the ring.
- return new PartitionRangeReadCommand(isDigestQuery(),
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- isRangeContinuation ? limits() : limits().withoutState(),
- dataRange().forSubRange(range),
- indexMetadata(),
- isTrackingWarnings());
+ return create(isDigestQuery(),
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ isRangeContinuation ? limits() : limits().withoutState(),
+ dataRange().forSubRange(range),
+ indexMetadata(),
+ isTrackingWarnings());
}
public PartitionRangeReadCommand copy()
{
- return new PartitionRangeReadCommand(isDigestQuery(),
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits(),
- dataRange(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(isDigestQuery(),
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
protected PartitionRangeReadCommand copyAsDigestQuery()
{
- return new PartitionRangeReadCommand(true,
- digestVersion(),
- false,
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits(),
- dataRange(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(true,
+ digestVersion(),
+ false,
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
protected PartitionRangeReadCommand copyAsTransientQuery()
{
- return new PartitionRangeReadCommand(false,
- 0,
- true,
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits(),
- dataRange(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(false,
+ 0,
+ true,
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
{
- return new PartitionRangeReadCommand(isDigestQuery(),
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- newLimits,
- dataRange(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(isDigestQuery(),
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ dataRange(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
{
- return new PartitionRangeReadCommand(isDigestQuery(),
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- newLimits,
- newDataRange,
- indexMetadata(),
- isTrackingWarnings());
+ return create(isDigestQuery(),
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ newDataRange,
+ indexMetadata(),
+ isTrackingWarnings());
}
public long getTimeout(TimeUnit unit)
@@ -449,7 +490,52 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
- return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false);
+ return PartitionRangeReadCommand.create(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false);
+ }
+ }
+
+ public static class VirtualTablePartitionRangeReadCommand extends PartitionRangeReadCommand
+ {
+ private VirtualTablePartitionRangeReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean acceptsTransient,
+ TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange,
+ IndexMetadata index,
+ boolean trackWarnings)
+ {
+ super(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index, trackWarnings);
+ }
+
+ @Override
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+ {
+ return executeInternal(executionController());
+ }
+
+ @Override
+ @SuppressWarnings("resource")
+ public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+ {
+ VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+ UnfilteredPartitionIterator resultIterator = view.select(dataRange, columnFilter());
+ return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+ }
+
+ @Override
+ public ReadExecutionController executionController()
+ {
+ return ReadExecutionController.empty();
+ }
+
+ @Override
+ public ReadExecutionController executionController(boolean trackRepairedStatus)
+ {
+ return executionController();
}
}
}
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
index 12624e7..5054f32 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
@@ -38,9 +38,6 @@ public interface PartitionRangeReadQuery extends ReadQuery
DataLimits limits,
DataRange dataRange)
{
- if (table.isVirtual())
- return VirtualTablePartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
-
return PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange);
}
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 7ed092a..9226568 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -94,6 +94,8 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
private void validateTransientStatus(Message<ReadCommand> message)
{
ReadCommand command = message.payload;
+ if (command.metadata().isVirtual())
+ return;
Token token;
if (command instanceof SinglePartitionReadCommand)
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 475b287..54a2524 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
@@ -35,6 +36,8 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.RTBoundValidator;
import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
@@ -57,8 +60,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
- private final DecoratedKey partitionKey;
- private final ClusteringIndexFilter clusteringIndexFilter;
+ protected final DecoratedKey partitionKey;
+ protected final ClusteringIndexFilter clusteringIndexFilter;
@VisibleForTesting
protected SinglePartitionReadCommand(boolean isDigest,
@@ -80,6 +83,48 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
this.clusteringIndexFilter = clusteringIndexFilter;
}
+ private static SinglePartitionReadCommand create(boolean isDigest,
+ int digestVersion,
+ boolean acceptsTransient,
+ TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter,
+ IndexMetadata index,
+ boolean trackWarnings)
+ {
+ if (metadata.isVirtual())
+ {
+ return new VirtualTableSinglePartitionReadCommand(isDigest,
+ digestVersion,
+ acceptsTransient,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ index,
+ trackWarnings);
+ }
+ return new SinglePartitionReadCommand(isDigest,
+ digestVersion,
+ acceptsTransient,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ index,
+ trackWarnings);
+ }
+
/**
* Creates a new read command on a single partition.
*
@@ -103,18 +148,18 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
ClusteringIndexFilter clusteringIndexFilter,
IndexMetadata indexMetadata)
{
- return new SinglePartitionReadCommand(false,
- 0,
- false,
- metadata,
- nowInSec,
- columnFilter,
- rowFilter,
- limits,
- partitionKey,
- clusteringIndexFilter,
- indexMetadata,
- false);
+ return create(false,
+ 0,
+ false,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ indexMetadata,
+ false);
}
/**
@@ -280,69 +325,69 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
public SinglePartitionReadCommand copy()
{
- return new SinglePartitionReadCommand(isDigestQuery(),
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits(),
- partitionKey(),
- clusteringIndexFilter(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(isDigestQuery(),
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
protected SinglePartitionReadCommand copyAsDigestQuery()
{
- return new SinglePartitionReadCommand(true,
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits(),
- partitionKey(),
- clusteringIndexFilter(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(true,
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
protected SinglePartitionReadCommand copyAsTransientQuery()
{
- return new SinglePartitionReadCommand(false,
- 0,
- true,
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits(),
- partitionKey(),
- clusteringIndexFilter(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(false,
+ 0,
+ true,
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
{
- return new SinglePartitionReadCommand(isDigestQuery(),
- digestVersion(),
- acceptsTransient(),
- metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- newLimits,
- partitionKey(),
- clusteringIndexFilter(),
- indexMetadata(),
- isTrackingWarnings());
+ return create(isDigestQuery(),
+ digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -1156,17 +1201,24 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
clusteringIndexFilter));
}
- return new Group(commands, limits);
+ return create(commands, limits);
}
- public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ private Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
{
super(commands, limits);
}
public static Group one(SinglePartitionReadCommand command)
{
- return new Group(Collections.singletonList(command), command.limits());
+ return create(Collections.singletonList(command), command.limits());
+ }
+
+ public static Group create(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ {
+ return commands.get(0).metadata().isVirtual() ?
+ new VirtualTableGroup(commands, limits) :
+ new Group(commands, limits);
}
public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
@@ -1175,6 +1227,25 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
}
}
+ public static class VirtualTableGroup extends Group
+ {
+ public VirtualTableGroup(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ {
+ super(commands, limits);
+ }
+
+ @Override
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+ {
+ if (queries.size() == 1)
+ return queries.get(0).execute(consistency, clientState, queryStartNanoTime);
+
+ return PartitionIterators.concat(queries.stream()
+ .map(q -> q.execute(consistency, clientState, queryStartNanoTime))
+ .collect(Collectors.toList()));
+ }
+ }
+
private static class Deserializer extends SelectionDeserializer
{
public ReadCommand deserialize(DataInputPlus in,
@@ -1192,7 +1263,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readBuffer(in, DatabaseDescriptor.getMaxValueSize()));
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
- return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false);
+ return SinglePartitionReadCommand.create(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false);
}
}
@@ -1223,4 +1294,50 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
return mergedSSTables;
}
}
+
+ public static class VirtualTableSinglePartitionReadCommand extends SinglePartitionReadCommand
+ {
+ protected VirtualTableSinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean acceptsTransient,
+ TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter,
+ IndexMetadata index,
+ boolean trackWarnings)
+ {
+ super(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter, index, trackWarnings);
+ }
+
+ @Override
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
+ {
+ return executeInternal(executionController());
+ }
+
+ @Override
+ @SuppressWarnings("resource")
+ public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+ {
+ VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+ UnfilteredPartitionIterator resultIterator = view.select(partitionKey, clusteringIndexFilter, columnFilter());
+ return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+ }
+
+ @Override
+ public ReadExecutionController executionController()
+ {
+ return ReadExecutionController.empty();
+ }
+
+ @Override
+ public ReadExecutionController executionController(boolean trackRepairedStatus)
+ {
+ return executionController();
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
index 5d344de..e595fcb 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -53,9 +53,7 @@ public interface SinglePartitionReadQuery extends ReadQuery
List<DecoratedKey> partitionKeys,
ClusteringIndexFilter clusteringIndexFilter)
{
- return metadata.isVirtual()
- ? VirtualTableSinglePartitionReadQuery.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter)
- : SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
+ return SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
}
@@ -100,9 +98,7 @@ public interface SinglePartitionReadQuery extends ReadQuery
DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter)
{
- return metadata.isVirtual()
- ? VirtualTableSinglePartitionReadQuery.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter)
- : SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ return SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
/**
diff --git a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
deleted file mode 100644
index b24a670..0000000
--- a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
-import org.apache.cassandra.db.virtual.VirtualTable;
-import org.apache.cassandra.schema.TableMetadata;
-
-/**
- * A read query that selects a (part of a) range of partitions of a virtual table.
- */
-public class VirtualTablePartitionRangeReadQuery extends VirtualTableReadQuery implements PartitionRangeReadQuery
-{
- private final DataRange dataRange;
-
- public static VirtualTablePartitionRangeReadQuery create(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DataRange dataRange)
- {
- return new VirtualTablePartitionRangeReadQuery(metadata,
- nowInSec,
- columnFilter,
- rowFilter,
- limits,
- dataRange);
- }
-
- private VirtualTablePartitionRangeReadQuery(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DataRange dataRange)
- {
- super(metadata, nowInSec, columnFilter, rowFilter, limits);
- this.dataRange = dataRange;
- }
-
- @Override
- public DataRange dataRange()
- {
- return dataRange;
- }
-
- @Override
- public PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits)
- {
- return new VirtualTablePartitionRangeReadQuery(metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- newLimits,
- dataRange());
- }
-
- @Override
- public PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
- {
- return new VirtualTablePartitionRangeReadQuery(metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- newLimits,
- newDataRange);
- }
-
- @Override
- protected UnfilteredPartitionIterator queryVirtualTable()
- {
- VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
- return view.select(dataRange, columnFilter());
- }
-
- @Override
- protected void appendCQLWhereClause(StringBuilder sb)
- {
- String filterString = dataRange.toCQLString(metadata(), rowFilter());
- if (!filterString.isEmpty())
- sb.append(" WHERE ").append(filterString);
- }
-}
diff --git a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
deleted file mode 100644
index ad22a58..0000000
--- a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.ClientState;
-
-/**
- * Base class for the {@code ReadQuery} implementations use to query virtual tables.
- */
-public abstract class VirtualTableReadQuery extends AbstractReadQuery
-{
- protected VirtualTableReadQuery(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits)
- {
- super(metadata, nowInSec, columnFilter, rowFilter, limits);
- }
-
- @Override
- public ReadExecutionController executionController()
- {
- return ReadExecutionController.empty();
- }
-
- @Override
- public PartitionIterator execute(ConsistencyLevel consistency,
- ClientState clientState,
- long queryStartNanoTime) throws RequestExecutionException
- {
- return executeInternal(executionController());
- }
-
- @Override
- @SuppressWarnings("resource")
- public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
- {
- UnfilteredPartitionIterator resultIterator = queryVirtualTable();
- return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
- }
-
- protected abstract UnfilteredPartitionIterator queryVirtualTable();
-}
diff --git a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
deleted file mode 100644
index 32346e8..0000000
--- a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.cassandra.db.filter.ClusteringIndexFilter;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
-import org.apache.cassandra.db.virtual.VirtualTable;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.ClientState;
-
-/**
- * A read query that selects a (part of a) single partition of a virtual table.
- */
-public class VirtualTableSinglePartitionReadQuery extends VirtualTableReadQuery implements SinglePartitionReadQuery
-{
- private final DecoratedKey partitionKey;
- private final ClusteringIndexFilter clusteringIndexFilter;
-
- public static VirtualTableSinglePartitionReadQuery create(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DecoratedKey partitionKey,
- ClusteringIndexFilter clusteringIndexFilter)
- {
- return new VirtualTableSinglePartitionReadQuery(metadata,
- nowInSec,
- columnFilter,
- rowFilter,
- limits,
- partitionKey,
- clusteringIndexFilter);
- }
-
- private VirtualTableSinglePartitionReadQuery(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DecoratedKey partitionKey,
- ClusteringIndexFilter clusteringIndexFilter)
- {
- super(metadata, nowInSec, columnFilter, rowFilter, limits);
- this.partitionKey = partitionKey;
- this.clusteringIndexFilter = clusteringIndexFilter;
- }
-
- @Override
- protected void appendCQLWhereClause(StringBuilder sb)
- {
- sb.append(" WHERE ").append(partitionKey().toCQLString(metadata()));
-
- String filterString = clusteringIndexFilter().toCQLString(metadata(), rowFilter());
- if (!filterString.isEmpty())
- {
- if (!clusteringIndexFilter().selectsAllPartition() || !rowFilter().isEmpty())
- sb.append(" AND ");
- sb.append(filterString);
- }
- }
-
- @Override
- public ClusteringIndexFilter clusteringIndexFilter()
- {
- return clusteringIndexFilter;
- }
-
- @Override
- public boolean selectsFullPartition()
- {
- return clusteringIndexFilter.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns();
- }
-
- @Override
- public DecoratedKey partitionKey()
- {
- return partitionKey;
- }
-
- @Override
- public SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits)
- {
- return new VirtualTableSinglePartitionReadQuery(metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- newLimits,
- partitionKey(),
- clusteringIndexFilter);
- }
-
- @Override
- public SinglePartitionReadQuery forPaging(Clustering<?> lastReturned, DataLimits limits)
- {
- return new VirtualTableSinglePartitionReadQuery(metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits,
- partitionKey(),
- lastReturned == null ? clusteringIndexFilter
- : clusteringIndexFilter.forPaging(metadata().comparator,
- lastReturned,
- false));
- }
-
- @Override
- protected UnfilteredPartitionIterator queryVirtualTable()
- {
- VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
- return view.select(partitionKey, clusteringIndexFilter, columnFilter());
- }
-
- /**
- * Groups multiple single partition read queries.
- */
- public static class Group extends SinglePartitionReadQuery.Group<VirtualTableSinglePartitionReadQuery>
- {
- public static Group create(TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- List<DecoratedKey> partitionKeys,
- ClusteringIndexFilter clusteringIndexFilter)
- {
- List<VirtualTableSinglePartitionReadQuery> queries = new ArrayList<>(partitionKeys.size());
- for (DecoratedKey partitionKey : partitionKeys)
- {
- queries.add(VirtualTableSinglePartitionReadQuery.create(metadata,
- nowInSec,
- columnFilter,
- rowFilter,
- limits,
- partitionKey,
- clusteringIndexFilter));
- }
-
- return new Group(queries, limits);
- }
-
- public Group(List<VirtualTableSinglePartitionReadQuery> queries, DataLimits limits)
- {
- super(queries, limits);
- }
-
- public static Group one(VirtualTableSinglePartitionReadQuery query)
- {
- return new Group(Collections.singletonList(query), query.limits());
- }
-
- public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
- {
- if (queries.size() == 1)
- return queries.get(0).execute(consistency, clientState, queryStartNanoTime);
-
- return PartitionIterators.concat(queries.stream()
- .map(q -> q.execute(consistency, clientState, queryStartNanoTime))
- .collect(Collectors.toList()));
- }
- }
-}
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
index 5e0f90c..23814cd 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
@@ -40,7 +40,10 @@ public final class VirtualKeyspaceRegistry
public void register(VirtualKeyspace keyspace)
{
- virtualKeyspaces.put(keyspace.name(), keyspace);
+ VirtualKeyspace previous = virtualKeyspaces.put(keyspace.name(), keyspace);
+ // some tests choose to replace the keyspace, if so make sure to cleanup tables as well
+ if (previous != null)
+ previous.tables().forEach(t -> virtualTables.remove(t));
keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t));
}
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 168601c..c7c6df0 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -152,7 +152,7 @@ public class LocalPartitioner implements IPartitioner
@Override
public int compareTo(Token o)
{
- assert getPartitioner() == o.getPartitioner();
+ assert getPartitioner() == o.getPartitioner() : String.format("partitioners do not match; %s != %s", getPartitioner(), o.getPartitioner());
return comparator.compare(token, ((LocalToken) o).token);
}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 09e1817..e2e3104 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.net;
+import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -264,6 +266,55 @@ public final class MessagingService extends MessagingServiceMBeanImpl
OutboundConnections.scheduleUnusedConnectionMonitoring(this, ScheduledExecutors.scheduledTasks, 1L, TimeUnit.HOURS);
}
+ public <T> org.apache.cassandra.utils.concurrent.Future<Message<T>> sendWithResult(Message message, InetAddressAndPort to)
+ {
+ AsyncPromise<Message<T>> promise = new AsyncPromise<>();
+ MessagingService.instance().sendWithCallback(message, to, new RequestCallback<T>()
+ {
+ @Override
+ public void onResponse(Message<T> msg)
+ {
+ promise.trySuccess(msg);
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
+ {
+ promise.tryFailure(new FailureResponseException(from, failureReason));
+ }
+
+ @Override
+ public boolean invokeOnFailure()
+ {
+ return true;
+ }
+ });
+ return promise;
+ }
+
+ public static class FailureResponseException extends IOException
+ {
+ private final InetAddressAndPort from;
+ private final RequestFailureReason failureReason;
+
+ public FailureResponseException(InetAddressAndPort from, RequestFailureReason failureReason)
+ {
+ super(String.format("Failure from %s: %s", from, failureReason.name()));
+ this.from = from;
+ this.failureReason = failureReason;
+ }
+
+ public InetAddressAndPort from()
+ {
+ return from;
+ }
+
+ public RequestFailureReason failureReason()
+ {
+ return failureReason;
+ }
+ }
+
/**
* Send a non-mutation message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index 5cae3b9..9cddedf 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.regex.Pattern;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.cassandra.db.Digest;
@@ -50,6 +51,10 @@ public final class SchemaConstants
public static final Set<String> LOCAL_SYSTEM_KEYSPACE_NAMES =
ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
+ /* virtual table system keyspace names */
+ public static final Set<String> VIRTUAL_SYSTEM_KEYSPACE_NAMES =
+ ImmutableSet.of(VIRTUAL_VIEWS, VIRTUAL_SCHEMA);
+
/* replicate system keyspace names (the ones with a "true" replication strategy) */
public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES =
ImmutableSet.of(TRACE_KEYSPACE_NAME, AUTH_KEYSPACE_NAME, DISTRIBUTED_KEYSPACE_NAME);
@@ -98,7 +103,7 @@ public final class SchemaConstants
*/
public static boolean isVirtualSystemKeyspace(String keyspaceName)
{
- return VIRTUAL_SCHEMA.equals(keyspaceName.toLowerCase()) || VIRTUAL_VIEWS.equals(keyspaceName.toLowerCase());
+ return VIRTUAL_SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
}
/**
@@ -111,4 +116,13 @@ public final class SchemaConstants
|| isReplicatedSystemKeyspace(keyspaceName)
|| isVirtualSystemKeyspace(keyspaceName);
}
+
+ /**
+ * Returns the set of all system keyspaces
+ * @return all system keyspaces
+ */
+ public static Set<String> getSystemKeyspaces()
+ {
+ return Sets.union(Sets.union(LOCAL_SYSTEM_KEYSPACE_NAMES, REPLICATED_SYSTEM_KEYSPACE_NAMES), VIRTUAL_SYSTEM_KEYSPACE_NAMES);
+ }
}
diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java
index 0633105..8645dc2 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -67,7 +67,7 @@ public class TableId
*/
public static TableId forSystemTable(String keyspace, String table)
{
- assert SchemaConstants.isLocalSystemKeyspace(keyspace) || SchemaConstants.isReplicatedSystemKeyspace(keyspace);
+ assert SchemaConstants.isSystemKeyspace(keyspace) : String.format("Table %s.%s is not a system table; only keyspaces allowed are %s", keyspace, table, SchemaConstants.getSystemKeyspaces());
return unsafeDeterministic(keyspace, table);
}
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index fcd1d58..ef43c5d 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -740,7 +740,9 @@ public class TableMetadata implements SchemaElement
if (id == null)
{
- if (DatabaseDescriptor.useDeterministicTableID()) id = TableId.unsafeDeterministic(keyspace, name);
+ // make sure vtables use determiniestic ids so they can be referenced in calls cross-nodes
+ // see CASSANDRA-17295
+ if (DatabaseDescriptor.useDeterministicTableID() || kind == Kind.VIRTUAL) id = TableId.unsafeDeterministic(keyspace, name);
else id = TableId.generate();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableFromInternode.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableFromInternode.java
new file mode 100644
index 0000000..e322585
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableFromInternode.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class VirtualTableFromInternode extends TestBaseImpl
+{
+ private static Cluster CLUSTER;
+
+ @BeforeClass
+ public static void setup() throws IOException
+ {
+ CLUSTER = Cluster.build(2)
+ .withConfig(c -> c.with(Feature.values()))
+ .start();
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ if (CLUSTER != null)
+ CLUSTER.close();
+ }
+
+ @Test
+ public void normal()
+ {
+ assertThat(CLUSTER.coordinator(1).executeWithResult("SELECT * FROM system_views.settings", ConsistencyLevel.ONE))
+ .hasSizeGreaterThan(2)
+ .contains("rpc_address", "127.0.0.1")
+ .contains("broadcast_address", "127.0.0.1");
+
+ assertThat(CLUSTER.coordinator(1).executeWithResult("SELECT * FROM system_views.settings WHERE name=?", ConsistencyLevel.ONE, "rpc_address"))
+ .isEqualTo("rpc_address", "127.0.0.1");
+
+ assertThat(CLUSTER.coordinator(1).executeWithResult("SELECT * FROM system_views.settings WHERE name IN (?, ?)", ConsistencyLevel.ONE, "rpc_address", "broadcast_address"))
+ .contains("rpc_address", "127.0.0.1")
+ .contains("broadcast_address", "127.0.0.1")
+ .hasSize(2);
+ }
+
+ @Test
+ public void readCommandAccessVirtualTable()
+ {
+ CLUSTER.get(1).runOnInstance(() -> {
+ boolean didWork = false;
+ for (InetAddressAndPort address : Gossiper.instance.getLiveMembers())
+ {
+ didWork = true;
+ UntypedResultSet rs = QueryProcessor.executeAsync(address, "SELECT * FROM system_views.settings")
+ .syncUninterruptibly().getNow();
+ assertThat(rs.isEmpty()).isFalse();
+ for (UntypedResultSet.Row row : rs)
+ {
+ String name = row.getString("name");
+ switch (name)
+ {
+ case "broadcast_address":
+ case "rpc_address":
+ assertThat(row.getString("value")).isEqualTo(address.getAddress().getHostAddress());
+ break;
+ }
+ }
+ }
+ assertThat(didWork).isTrue();
+ });
+ }
+
+ @Test
+ public void readCommandAccessVirtualTableSinglePartition()
+ {
+ CLUSTER.get(1).runOnInstance(() -> {
+ boolean didWork = false;
+ for (InetAddressAndPort address : Gossiper.instance.getLiveMembers())
+ {
+ didWork = true;
+ UntypedResultSet rs = QueryProcessor.executeAsync(address, "SELECT * FROM system_views.settings WHERE name=?", "rpc_address")
+ .syncUninterruptibly().getNow();
+ assertThat(rs.isEmpty()).isFalse();
+ assertThat(rs.one().getString("value")).isEqualTo(address.getAddress().getHostAddress());
+ }
+ assertThat(didWork).isTrue();
+ });
+ }
+
+ @Test
+ public void readCommandAccessVirtualTableMultiplePartition()
+ {
+ CLUSTER.get(1).runOnInstance(() -> {
+ boolean didWork = false;
+ for (InetAddressAndPort address : Gossiper.instance.getLiveMembers())
+ {
+ didWork = true;
+ UntypedResultSet rs = QueryProcessor.executeAsync(address, "SELECT * FROM system_views.settings WHERE name IN (?, ?)", "rpc_address", "broadcast_address")
+ .syncUninterruptibly().getNow();
+ assertThat(rs.isEmpty()).isFalse();
+ Set<String> columns = new HashSet<>();
+ for (UntypedResultSet.Row row : rs)
+ {
+ columns.add(row.getString("name"));
+ assertThat(row.getString("value")).isEqualTo(address.getAddress().getHostAddress());
+ }
+ assertThat(columns).isEqualTo(ImmutableSet.of("rpc_address", "broadcast_address"));
+ }
+ assertThat(didWork).isTrue();
+ });
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
new file mode 100644
index 0000000..f9c1d90
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.util;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.assertj.core.api.Assertions;
+
+public class QueryResultUtil
+{
+ private QueryResultUtil()
+ {
+ }
+
+ public static boolean contains(SimpleQueryResult qr, Object... values)
+ {
+ return contains(qr, a -> equals(a, values));
+ }
+
+ public static boolean contains(SimpleQueryResult qr, Row row)
+ {
+ return contains(qr, a -> equals(a, row));
+ }
+
+ public static boolean contains(SimpleQueryResult qr, Predicate<Row> fn)
+ {
+ while (qr.hasNext())
+ {
+ if (fn.test(qr.next()))
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean equals(Row a, Row b)
+ {
+ return equals(a, b.toObjectArray());
+ }
+
+ private static boolean equals(Row a, Object[] bs)
+ {
+ Object[] as = a.toObjectArray();
+ if (as.length != bs.length)
+ return false;
+ for (int i = 0; i < as.length; i++)
+ {
+ if (!Objects.equals(as[i], bs[i]))
+ return false;
+ }
+ return true;
+ }
+
+ public static AssertHelper assertThat(SimpleQueryResult qr)
+ {
+ return new AssertHelper(qr);
+ }
+
+ public static class AssertHelper
+ {
+ private final SimpleQueryResult qr;
+
+ private AssertHelper(SimpleQueryResult qr)
+ {
+ this.qr = qr;
+ }
+
+ public AssertHelper contains(Object... values)
+ {
+ qr.reset();
+ if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, values)))
+ throw new AssertionError("Row " + Arrays.asList(values) + " is not present");
+ return this;
+ }
+
+ public AssertHelper contains(Row row)
+ {
+ qr.reset();
+ if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, row)))
+ throw new AssertionError("Row " + row + " is not present");
+ return this;
+ }
+
+ public AssertHelper contains(Predicate<Row> fn)
+ {
+ qr.reset();
+ if (!QueryResultUtil.contains(qr, fn))
+ throw new AssertionError("Row is not present");
+ return this;
+ }
+
+ public AssertHelper isEqualTo(Object... values)
+ {
+ Assertions.assertThat(qr.toObjectArrays())
+ .hasSize(1)
+ .contains(values);
+ return this;
+ }
+
+ public AssertHelper hasSize(int size)
+ {
+ Assertions.assertThat(qr.toObjectArrays()).hasSize(size);
+ return this;
+ }
+
+ public AssertHelper hasSizeGreaterThan(int size)
+ {
+ Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size);
+ return this;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java b/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
index 4d0adea..11582b9 100644
--- a/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
+++ b/test/unit/org/apache/cassandra/db/AbstractReadQueryToCQLStringTest.java
@@ -780,11 +780,6 @@ public class AbstractReadQueryToCQLStringTest extends CQLTester
SinglePartitionReadCommand.Group group = (SinglePartitionReadCommand.Group) readQuery;
return group.queries.stream().map(AbstractReadQuery::toCQLString).collect(Collectors.toList());
}
- else if (readQuery instanceof VirtualTableSinglePartitionReadQuery.Group)
- {
- VirtualTableSinglePartitionReadQuery.Group group = (VirtualTableSinglePartitionReadQuery.Group) readQuery;
- return group.queries.stream().map(AbstractReadQuery::toCQLString).collect(Collectors.toList());
- }
else
{
assertTrue(readQuery instanceof AbstractReadQuery);
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 69b8c37..289014f 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -358,7 +358,7 @@ public class ReadCommandTest
cfs.forceBlockingFlush();
- ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+ ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator iter = query.executeLocally(executionController);
@@ -528,7 +528,7 @@ public class ReadCommandTest
cfs.forceBlockingFlush();
- ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+ ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator iter = query.executeLocally(executionController);
@@ -604,7 +604,7 @@ public class ReadCommandTest
cfs.forceBlockingFlush();
- ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+ ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE);
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator iter = query.executeLocally(executionController);
diff --git a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
index ad28c13..cdb14ab 100644
--- a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
@@ -68,6 +68,7 @@ public class CredentialsCacheKeysTableTest extends CQLTester
// ensure nothing keeps cached between tests
passwordAuthenticator.getCredentialsCache().invalidate();
+ disablePreparedReuseForTest();
}
@AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java
index f670fbd..363dfd7 100644
--- a/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/JmxPermissionsCacheKeysTableTest.java
@@ -89,6 +89,7 @@ public class JmxPermissionsCacheKeysTableTest extends CQLTester
// ensure nothing keeps cached between tests
AuthorizationProxy.jmxPermissionsCache.invalidate();
+ disablePreparedReuseForTest();
}
@AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
index 8f7f82b..e329d88 100644
--- a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
@@ -69,6 +69,7 @@ public class NetworkPermissionsCacheKeysTableTest extends CQLTester
// ensure nothing keeps cached between tests
AuthenticatedUser.networkPermissionsCache.invalidate();
+ disablePreparedReuseForTest();
}
@AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
index 1bfbb0b..97406e4 100644
--- a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
@@ -84,6 +84,7 @@ public class PermissionsCacheKeysTableTest extends CQLTester
// ensure nothing keeps cached between tests
AuthenticatedUser.permissionsCache.invalidate();
+ disablePreparedReuseForTest();
}
@AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
index 6e5131d..ce8beea 100644
--- a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
@@ -69,6 +69,7 @@ public class RolesCacheKeysTableTest extends CQLTester
// ensure nothing keeps cached between tests
Roles.cache.invalidate();
+ disablePreparedReuseForTest();
}
@AfterClass
diff --git a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java
index b21b3c4..45da7e2 100644
--- a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java
@@ -56,6 +56,7 @@ public class SSTableTasksTableTest extends CQLTester
{
table = new SSTableTasksTable(KS_NAME);
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
+ disablePreparedReuseForTest();
}
@Test
diff --git a/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java
index a2fda49..8239cef 100644
--- a/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/SettingsTableTest.java
@@ -59,6 +59,7 @@ public class SettingsTableTest extends CQLTester
config.server_encryption_options.applyConfig();
table = new SettingsTable(KS_NAME, config);
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
+ disablePreparedReuseForTest();
}
private String getValue(Field f)
diff --git a/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java
index 2ec0683..5242d55 100644
--- a/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/SystemPropertiesTableTest.java
@@ -56,6 +56,7 @@ public class SystemPropertiesTableTest extends CQLTester
{
table = new SystemPropertiesTable(KS_NAME);
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
+ disablePreparedReuseForTest();
}
@Test
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index 50bb3e2..323de1a 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -326,7 +326,7 @@ public class QueryPagerTest
public void multiQueryTest(boolean testPagingState, ProtocolVersion protocolVersion)
{
- ReadQuery command = new SinglePartitionReadCommand.Group(new ArrayList<SinglePartitionReadCommand>()
+ ReadQuery command = SinglePartitionReadCommand.Group.create(new ArrayList<SinglePartitionReadCommand>()
{{
add(sliceQuery("k1", "c2", "c6", 10));
add(sliceQuery("k4", "c3", "c5", 10));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org