You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/30 16:38:02 UTC
[02/10] cassandra git commit: Fix race condition in read command
serialization
Fix race condition in read command serialization
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13363
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f297bcf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f297bcf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f297bcf
Branch: refs/heads/cassandra-3.11
Commit: 7f297bcf8aced983cbc9c4103d0ebefc1789f0dd
Parents: d03c046
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Aug 14 16:43:06 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Wed Aug 30 16:16:46 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../cql3/statements/SelectStatement.java | 16 +-
.../db/AbstractReadCommandBuilder.java | 2 +-
.../cassandra/db/PartitionRangeReadCommand.java | 133 +++++++++++---
.../org/apache/cassandra/db/ReadCommand.java | 149 ++++++++-------
.../db/SinglePartitionReadCommand.java | 180 ++++++++++++++++---
.../cassandra/index/SecondaryIndexManager.java | 9 +-
.../internal/composites/CompositesSearcher.java | 6 +-
.../index/internal/keys/KeysSearcher.java | 3 +-
.../cassandra/service/AbstractReadExecutor.java | 4 +-
.../service/pager/PartitionRangeQueryPager.java | 8 +-
.../cassandra/thrift/CassandraServer.java | 69 ++++---
test/unit/org/apache/cassandra/Util.java | 26 +--
.../apache/cassandra/db/SecondaryIndexTest.java | 10 +-
.../db/SinglePartitionSliceCommandTest.java | 45 ++---
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
16 files changed, 427 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 452dc9b..aca9e1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.0.15
- * enable segement creation before recovering commitlogs (CASSANDRA-13587)
+ * Fix race condition in read command serialization (CASSANDRA-13363)
+ * Enable segement creation before recovering commitlogs (CASSANDRA-13587)
* Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index bd377f4..3882a23 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -540,18 +540,10 @@ public class SelectStatement implements CQLStatement
if (keyBounds == null)
return ReadQuery.EMPTY;
- PartitionRangeReadCommand command = new PartitionRangeReadCommand(cfm,
- nowInSec,
- queriedColumns,
- rowFilter,
- limit,
- new DataRange(keyBounds, clusteringIndexFilter),
- Optional.empty());
- // If there's a secondary index that the command can use, have it validate
- // the request parameters. Note that as a side effect, if a viable Index is
- // identified by the CFS's index manager, it will be cached in the command
- // and serialized during distribution to replicas in order to avoid performing
- // further lookups.
+ PartitionRangeReadCommand command =
+ PartitionRangeReadCommand.create(false, cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+
+ // If there's a secondary index that the command can use, have it validate the request parameters.
command.maybeValidateIndex();
return command;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index afbab74..d219816 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -336,7 +336,7 @@ public abstract class AbstractReadCommandBuilder
else
bounds = new ExcludingBounds<>(start, end);
- return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
+ return PartitionRangeReadCommand.create(false, cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
}
static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index fb2dd0d..9e557e0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.CFMetaData;
@@ -59,31 +59,39 @@ public class PartitionRangeReadCommand extends ReadCommand
private final DataRange dataRange;
private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
- public PartitionRangeReadCommand(boolean isDigest,
- int digestVersion,
- boolean isForThrift,
- CFMetaData metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DataRange dataRange,
- Optional<IndexMetadata> index)
+ private PartitionRangeReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange,
+ IndexMetadata index)
{
- super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
this.dataRange = dataRange;
- this.index = index;
}
- public PartitionRangeReadCommand(CFMetaData metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DataRange dataRange,
- Optional<IndexMetadata> index)
+ public static PartitionRangeReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
{
- this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
+ return new PartitionRangeReadCommand(false,
+ 0,
+ isForThrift,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ dataRange,
+ findIndex(metadata, rowFilter));
}
/**
@@ -96,13 +104,14 @@ public class PartitionRangeReadCommand extends ReadCommand
*/
public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
{
- return new PartitionRangeReadCommand(metadata,
+ return new PartitionRangeReadCommand(false, 0, false,
+ metadata,
nowInSec,
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
DataRange.allData(metadata.partitioner),
- Optional.empty());
+ null);
}
public DataRange dataRange()
@@ -122,17 +131,72 @@ public class PartitionRangeReadCommand extends ReadCommand
public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
{
- return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
+ return new PartitionRangeReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange().forSubRange(range),
+ indexMetadata());
}
public PartitionRangeReadCommand copy()
{
- return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
+ return new PartitionRangeReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange(),
+ indexMetadata());
+ }
+
+ public PartitionRangeReadCommand copyAsDigestQuery()
+ {
+ return new PartitionRangeReadCommand(true,
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange(),
+ indexMetadata());
+ }
+
+ public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange)
+ {
+ return new PartitionRangeReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ newDataRange,
+ indexMetadata());
}
- public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
+ public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
{
- return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
+ return new PartitionRangeReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ newLimits,
+ newDataRange,
+ indexMetadata());
}
public long getTimeout()
@@ -173,7 +237,8 @@ public class PartitionRangeReadCommand extends ReadCommand
metric.rangeLatency.addNano(latencyNanos);
}
- protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ @VisibleForTesting
+ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
{
ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
@@ -337,7 +402,17 @@ public class PartitionRangeReadCommand extends ReadCommand
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+ public ReadCommand deserialize(DataInputPlus in,
+ int version,
+ boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ IndexMetadata index)
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 76180cc..66985b6 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,24 +108,27 @@ public abstract class ReadCommand implements ReadQuery
private final RowFilter rowFilter;
private final DataLimits limits;
- // SecondaryIndexManager will attempt to provide the most selective of any available indexes
- // during execution. Here we also store an the results of that lookup to repeating it over
- // the lifetime of the command.
- protected Optional<IndexMetadata> index = Optional.empty();
-
- // Flag to indicate whether the index manager has been queried to select an index for this
- // command. This is necessary as the result of that lookup may be null, in which case we
- // still don't want to repeat it.
- private boolean indexManagerQueried = false;
-
- private boolean isDigestQuery;
+ private final boolean isDigestQuery;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
private final boolean isForThrift;
+ @Nullable
+ private final IndexMetadata index;
+
protected static abstract class SelectionDeserializer
{
- public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
+ public abstract ReadCommand deserialize(DataInputPlus in,
+ int version,
+ boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ IndexMetadata index) throws IOException;
}
protected enum Kind
@@ -147,7 +152,8 @@ public abstract class ReadCommand implements ReadQuery
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
- DataLimits limits)
+ DataLimits limits,
+ IndexMetadata index)
{
this.kind = kind;
this.isDigestQuery = isDigestQuery;
@@ -158,6 +164,7 @@ public abstract class ReadCommand implements ReadQuery
this.columnFilter = columnFilter;
this.rowFilter = rowFilter;
this.limits = limits;
+ this.index = index;
}
protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
@@ -253,18 +260,6 @@ public abstract class ReadCommand implements ReadQuery
}
/**
- * Sets whether this command should be a digest one or not.
- *
- * @param isDigestQuery whether the command should be set as a digest one or not.
- * @return this read command.
- */
- public ReadCommand setIsDigestQuery(boolean isDigestQuery)
- {
- this.isDigestQuery = isDigestQuery;
- return this;
- }
-
- /**
* Sets the digest version, for when digest for that command is requested.
* <p>
* Note that we allow setting this independently of setting the command as a digest query as
@@ -291,6 +286,30 @@ public abstract class ReadCommand implements ReadQuery
}
/**
+ * Index (metadata) chosen for this query. Can be null.
+ *
+ * @return index (metadata) chosen for this query
+ */
+ @Nullable
+ public IndexMetadata indexMetadata()
+ {
+ return index;
+ }
+
+ /**
+ * Index instance chosen for this query. Can be null.
+ *
+ * @return Index instance chosen for this query. Can be null.
+ */
+ @Nullable
+ public Index index()
+ {
+ return null == index
+ ? null
+ : Keyspace.openAndGetStore(metadata).indexManager.getIndex(index);
+ }
+
+ /**
* The clustering index filter this command to use for the provided key.
* <p>
* Note that that method should only be called on a key actually queried by this command
@@ -310,6 +329,11 @@ public abstract class ReadCommand implements ReadQuery
*/
public abstract ReadCommand copy();
+ /**
+ * Returns a copy of this command with isDigestQuery set to true.
+ */
+ public abstract ReadCommand copyAsDigestQuery();
+
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
protected abstract int oldestUnrepairedTombstone();
@@ -321,35 +345,32 @@ public abstract class ReadCommand implements ReadQuery
: ReadResponse.createDataResponse(iterator, this);
}
- public long indexSerializedSize(int version)
+ long indexSerializedSize(int version)
{
- if (index.isPresent())
- return IndexMetadata.serializer.serializedSize(index.get(), version);
- else
- return 0;
+ return null != index
+ ? IndexMetadata.serializer.serializedSize(index, version)
+ : 0;
}
public Index getIndex(ColumnFamilyStore cfs)
{
- // if we've already consulted the index manager, and it returned a valid index
- // the result should be cached here.
- if(index.isPresent())
- return cfs.indexManager.getIndex(index.get());
-
- // if no cached index is present, but we've already consulted the index manager
- // then no registered index is suitable for this command, so just return null.
- if (indexManagerQueried)
+ return null != index
+ ? cfs.indexManager.getIndex(index)
+ : null;
+ }
+
+ static IndexMetadata findIndex(CFMetaData table, RowFilter rowFilter)
+ {
+ if (table.getIndexes().isEmpty() || rowFilter.isEmpty())
return null;
- // do the lookup, set the flag to indicate so and cache the result if not null
- Index selected = cfs.indexManager.getBestIndexFor(this);
- indexManagerQueried = true;
+ ColumnFamilyStore cfs = Keyspace.openAndGetStore(table);
- if (selected == null)
- return null;
+ Index index = cfs.indexManager.getBestIndexFor(rowFilter);
- index = Optional.of(selected.getIndexMetadata());
- return selected;
+ return null != index
+ ? index.getIndexMetadata()
+ : null;
}
/**
@@ -602,7 +623,7 @@ public abstract class ReadCommand implements ReadQuery
assert version >= MessagingService.VERSION_30;
out.writeByte(command.kind.ordinal());
- out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
+ out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(null != command.index));
if (command.isDigestQuery())
out.writeUnsignedVInt(command.digestVersion());
CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -610,8 +631,8 @@ public abstract class ReadCommand implements ReadQuery
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
RowFilter.serializer.serialize(command.rowFilter(), out, version);
DataLimits.serializer.serialize(command.limits(), out, version);
- if (command.index.isPresent())
- IndexMetadata.serializer.serialize(command.index.get(), out, version);
+ if (null != command.index)
+ IndexMetadata.serializer.serialize(command.index, out, version);
command.serializeSelection(out, version);
}
@@ -631,18 +652,16 @@ public abstract class ReadCommand implements ReadQuery
ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version);
- Optional<IndexMetadata> index = hasIndex
- ? deserializeIndexMetadata(in, version, metadata)
- : Optional.empty();
+ IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
}
- private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
{
try
{
- return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+ return IndexMetadata.serializer.deserialize(in, version, cfm);
}
catch (UnknownIndexException e)
{
@@ -652,7 +671,7 @@ public abstract class ReadCommand implements ReadQuery
"index. Please wait for schema agreement after index creation.",
cfm.ksName, cfm.cfName, e.indexId.toString());
logger.info(message);
- return Optional.empty();
+ return null;
}
}
@@ -830,7 +849,7 @@ public abstract class ReadCommand implements ReadQuery
else
limits = DataLimits.cqlLimits(maxResults);
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
+ return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
}
static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -939,9 +958,8 @@ public abstract class ReadCommand implements ReadQuery
ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
- return new PartitionRangeReadCommand(
- command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
- command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
+
+ return command.withUpdatedDataRange(newRange);
}
static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
@@ -1096,7 +1114,7 @@ public abstract class ReadCommand implements ReadQuery
// missing without any problems, so we can safely always set "inclusive" to false in the data range
dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
}
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
+ return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
}
public long serializedSize(ReadCommand command, int version)
@@ -1290,10 +1308,7 @@ public abstract class ReadCommand implements ReadQuery
{
Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
- // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
- return new SinglePartitionReadCommand(
- isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
- key, selectionAndFilter.right);
+ return SinglePartitionReadCommand.legacyNamesCommand(isDigest, version, metadata, nowInSeconds, selectionAndFilter.left, key, selectionAndFilter.right);
}
static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException
@@ -1422,8 +1437,7 @@ public abstract class ReadCommand implements ReadQuery
else
limits = DataLimits.cqlLimits(count);
- // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
- return new SinglePartitionReadCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
+ return SinglePartitionReadCommand.legacySliceCommand(isDigest, version, metadata, nowInSeconds, columnFilter, limits, key, filter);
}
private long serializedSliceCommandSize(SinglePartitionReadCommand command)
@@ -1605,9 +1619,8 @@ public abstract class ReadCommand implements ReadQuery
ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter();
ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
- return new SinglePartitionReadCommand(
- command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
- command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
+
+ return command.withUpdatedClusteringIndexFilter(sliceFilter);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 686ec35..00464ca 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -70,18 +70,19 @@ public class SinglePartitionReadCommand extends ReadCommand
private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
- public SinglePartitionReadCommand(boolean isDigest,
- int digestVersion,
- boolean isForThrift,
- CFMetaData metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DecoratedKey partitionKey,
- ClusteringIndexFilter clusteringIndexFilter)
+ private SinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter,
+ IndexMetadata index)
{
- super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
assert partitionKey.getPartitioner() == metadata.partitioner;
this.partitionKey = partitionKey;
this.clusteringIndexFilter = clusteringIndexFilter;
@@ -90,6 +91,44 @@ public class SinglePartitionReadCommand extends ReadCommand
/**
* Creates a new read command on a single partition.
*
+ * @param isForThrift whether the query is for thrift or not.
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ * @param indexMetadata explicitly specified index to use for the query
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter,
+ IndexMetadata indexMetadata)
+ {
+ return new SinglePartitionReadCommand(false,
+ 0,
+ isForThrift,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ indexMetadata);
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
* @param metadata the table to query.
* @param nowInSec the time in seconds to use are "now" for this query.
* @param columnFilter the column filter to use for the query.
@@ -112,7 +151,7 @@ public class SinglePartitionReadCommand extends ReadCommand
}
/**
- * Creates a new read command on a single partition for thrift.
+ * Creates a new read command on a single partition.
*
* @param isForThrift whether the query is for thrift or not.
* @param metadata the table to query.
@@ -134,7 +173,15 @@ public class SinglePartitionReadCommand extends ReadCommand
DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter)
{
- return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ return create(isForThrift,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ findIndex(metadata, rowFilter));
}
/**
@@ -148,7 +195,11 @@ public class SinglePartitionReadCommand extends ReadCommand
*
* @return a newly created read command. The returned command will use no row filter and have no limits.
*/
- public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
+ public static SinglePartitionReadCommand create(CFMetaData metadata,
+ int nowInSec,
+ DecoratedKey key,
+ ColumnFilter columnFilter,
+ ClusteringIndexFilter filter)
{
return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
}
@@ -164,7 +215,7 @@ public class SinglePartitionReadCommand extends ReadCommand
*/
public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
{
- return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+ return create(metadata, nowInSec, key, Slices.ALL);
}
/**
@@ -178,7 +229,7 @@ public class SinglePartitionReadCommand extends ReadCommand
*/
public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
{
- return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+ return create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
}
/**
@@ -211,7 +262,7 @@ public class SinglePartitionReadCommand extends ReadCommand
public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
{
ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
- return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
}
/**
@@ -244,7 +295,7 @@ public class SinglePartitionReadCommand extends ReadCommand
public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
{
ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
- return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
}
/**
@@ -265,7 +316,82 @@ public class SinglePartitionReadCommand extends ReadCommand
public SinglePartitionReadCommand copy()
{
- return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ return new SinglePartitionReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata());
+ }
+
+ public SinglePartitionReadCommand copyAsDigestQuery()
+ {
+ return new SinglePartitionReadCommand(true,
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata());
+ }
+
+ public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter)
+ {
+ return new SinglePartitionReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ filter,
+ indexMetadata());
+ }
+
+ static SinglePartitionReadCommand legacySliceCommand(boolean isDigest,
+ int digestVersion,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexSliceFilter filter)
+ {
+ // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+ return new SinglePartitionReadCommand(isDigest,
+ digestVersion,
+ true,
+ metadata,
+ nowInSec,
+ columnFilter,
+ RowFilter.NONE,
+ limits,
+ partitionKey,
+ filter,
+ null);
+ }
+
+ static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest,
+ int digestVersion,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ DecoratedKey partitionKey,
+ ClusteringIndexNamesFilter filter)
+ {
+ // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+ return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null);
}
public DecoratedKey partitionKey()
@@ -432,7 +558,7 @@ public class SinglePartitionReadCommand extends ReadCommand
final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
@SuppressWarnings("resource") // we close on exception or upon closing the result of this method
- UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+ UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
try
{
// Use a custom iterator instead of DataLimits to avoid stopping the original iterator
@@ -1068,12 +1194,22 @@ public class SinglePartitionReadCommand extends ReadCommand
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+ public ReadCommand deserialize(DataInputPlus in,
+ int version,
+ boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ IndexMetadata index)
throws IOException
{
DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize()));
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
- return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index dd6dde4..5976ddf 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.statements.IndexTarget;
@@ -697,17 +698,17 @@ public class SecondaryIndexManager implements IndexRegistry
* cached for future use when obtaining a Searcher, getting the index's underlying CFS for
* ReadOrderGroup, or an estimate of the result size from an average index query.
*
- * @param command ReadCommand to be executed
+ * @param rowFilter RowFilter of the command to be executed
* @return an Index instance, ready to use during execution of the command, or null if none
* of the registered indexes can support the command.
*/
- public Index getBestIndexFor(ReadCommand command)
+ public Index getBestIndexFor(RowFilter rowFilter)
{
- if (indexes.isEmpty() || command.rowFilter().isEmpty())
+ if (indexes.isEmpty() || rowFilter.isEmpty())
return null;
Set<Index> searchableIndexes = new HashSet<>();
- for (RowFilter.Expression expression : command.rowFilter())
+ for (RowFilter.Expression expression : rowFilter)
{
if (expression.isCustom())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index 135839b..f8a7c66 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -136,13 +136,15 @@ public class CompositesSearcher extends CassandraIndexSearcher
// Query the gathered index hits. We still need to filter stale hits from the resulting query.
ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
- SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
+ SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
+ index.baseCfs.metadata,
command.nowInSec(),
command.columnFilter(),
command.rowFilter(),
DataLimits.NONE,
partitionKey,
- filter);
+ filter,
+ null);
@SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
// by the next caller of next, or through closing this iterator is this come before.
UnfilteredRowIterator dataIter =
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index 189b652..c14c5a7 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -99,7 +99,8 @@ public class KeysSearcher extends CassandraIndexSearcher
command.rowFilter(),
DataLimits.NONE,
key,
- command.clusteringIndexFilter(key));
+ command.clusteringIndexFilter(key),
+ null);
@SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
// Otherwise, we close right away if empty, and if it's assigned to next it will be called either
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index cae1f1a..177fdb2 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -88,7 +88,7 @@ public abstract class AbstractReadExecutor
protected void makeDigestRequests(Iterable<InetAddress> endpoints)
{
- makeRequests(command.copy().setIsDigestQuery(true), endpoints);
+ makeRequests(command.copyAsDigestQuery(), endpoints);
}
private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
@@ -284,7 +284,7 @@ public abstract class AbstractReadExecutor
// Could be waiting on the data, or on enough digests.
ReadCommand retryCommand = command;
if (handler.resolver.isDataPresent())
- retryCommand = command.copy().setIsDigestQuery(true);
+ retryCommand = command.copyAsDigestQuery();
InetAddress extraReplica = Iterables.getLast(targetReplicas);
if (traceState != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 9c216e3..ea79017 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.Optional;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,8 +25,6 @@ import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.index.Index;
-import org.apache.cassandra.schema.IndexMetadata;
/**
* Pages a PartitionRangeReadCommand.
@@ -90,9 +86,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
}
}
- Index index = command.getIndex(Keyspace.openAndGetStore(command.metadata()));
- Optional<IndexMetadata> indexMetadata = index != null ? Optional.of(index.getIndexMetadata()) : Optional.empty();
- return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, indexMetadata);
+ return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange);
}
protected void recordLast(DecoratedKey key, Row last)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 86caac3..cb74b15 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.metrics.ClientMetrics;
@@ -1520,16 +1519,16 @@ public class CassandraServer implements Cassandra.Iface
ColumnFilter columns = makeColumnFilter(metadata, column_parent, predicate);
ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate);
DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate);
- PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
- 0,
- true,
- metadata,
- nowInSec,
- columns,
- ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
- limits,
- new DataRange(bounds, filter),
- Optional.empty());
+
+ PartitionRangeReadCommand cmd =
+ PartitionRangeReadCommand.create(true,
+ metadata,
+ nowInSec,
+ columns,
+ ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
+ limits,
+ new DataRange(bounds, filter));
+
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
assert results != null;
@@ -1614,16 +1613,16 @@ public class CassandraServer implements Cassandra.Iface
Clustering pageFrom = metadata.isSuper()
? new Clustering(start_column)
: LegacyLayout.decodeCellName(metadata, start_column).clustering;
- PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
- 0,
- true,
- metadata,
- nowInSec,
- ColumnFilter.all(metadata),
- RowFilter.NONE,
- limits,
- new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
- Optional.empty());
+
+ PartitionRangeReadCommand cmd =
+ PartitionRangeReadCommand.create(true,
+ metadata,
+ nowInSec,
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ limits,
+ new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
@@ -1706,21 +1705,17 @@ public class CassandraServer implements Cassandra.Iface
ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate);
ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate);
DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate);
- PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
- 0,
- true,
- metadata,
- nowInSec,
- columns,
- ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
- limits,
- new DataRange(bounds, filter),
- Optional.empty());
- // If there's a secondary index that the command can use, have it validate
- // the request parameters. Note that as a side effect, if a viable Index is
- // identified by the CFS's index manager, it will be cached in the command
- // and serialized during distribution to replicas in order to avoid performing
- // further lookups.
+
+ PartitionRangeReadCommand cmd =
+ PartitionRangeReadCommand.create(true,
+ metadata,
+ nowInSec,
+ columns,
+ ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
+ limits,
+ new DataRange(bounds, filter));
+
+ // If there's a secondary index that the command can use, have it validate the request parameters.
cmd.maybeValidateIndex();
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
@@ -2533,7 +2528,7 @@ public class CassandraServer implements Cassandra.Iface
// We want to know if the partition exists, so just fetch a single cell.
ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
DataLimits limits = DataLimits.thriftLimits(1, 1);
- return new SinglePartitionReadCommand(false, 0, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter);
+ return SinglePartitionReadCommand.create(true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter);
}
// Gather the clustering for the expected values and query those.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index e8b42bc..d758efe 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -629,31 +629,7 @@ public class Util
ColumnFamilyStore cfs,
ReadOrderGroup orderGroup)
{
- return new InternalPartitionRangeReadCommand(command).queryStorageInternal(cfs, orderGroup);
- }
-
- private static final class InternalPartitionRangeReadCommand extends PartitionRangeReadCommand
- {
-
- private InternalPartitionRangeReadCommand(PartitionRangeReadCommand original)
- {
- super(original.isDigestQuery(),
- original.digestVersion(),
- original.isForThrift(),
- original.metadata(),
- original.nowInSec(),
- original.columnFilter(),
- original.rowFilter(),
- original.limits(),
- original.dataRange(),
- Optional.empty());
- }
-
- private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs,
- ReadOrderGroup orderGroup)
- {
- return queryStorage(cfs, orderGroup);
- }
+ return command.queryStorage(cfs, orderGroup);
}
public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index bbccc48..2457c4a 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -118,7 +118,7 @@ public class SecondaryIndexTest
.filterOn("birthdate", Operator.EQ, 1L)
.build();
- Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc);
+ Index.Searcher searcher = rc.index().searcherFor(rc);
try (ReadOrderGroup orderGroup = rc.startOrderGroup(); UnfilteredPartitionIterator pi = searcher.search(orderGroup))
{
assertTrue(pi.hasNext());
@@ -204,7 +204,7 @@ public class SecondaryIndexTest
// verify that it's not being indexed under any other value either
ReadCommand rc = Util.cmd(cfs).build();
- assertNull(cfs.indexManager.getBestIndexFor(rc));
+ assertNull(rc.index());
// resurrect w/ a newer timestamp
new RowUpdateBuilder(cfs.metadata, 2, "k1").clustering("c").add("birthdate", 1L).build().apply();;
@@ -222,13 +222,13 @@ public class SecondaryIndexTest
// todo - checking the # of index searchers for the command is probably not the best thing to test here
RowUpdateBuilder.deleteRow(cfs.metadata, 3, "k1", "c").applyUnsafe();
rc = Util.cmd(cfs).build();
- assertNull(cfs.indexManager.getBestIndexFor(rc));
+ assertNull(rc.index());
// make sure obsolete mutations don't generate an index entry
// todo - checking the # of index searchers for the command is probably not the best thing to test here
new RowUpdateBuilder(cfs.metadata, 3, "k1").clustering("c").add("birthdate", 1L).build().apply();;
rc = Util.cmd(cfs).build();
- assertNull(cfs.indexManager.getBestIndexFor(rc));
+ assertNull(rc.index());
}
@Test
@@ -504,7 +504,7 @@ public class SecondaryIndexTest
ColumnDefinition cdef = cfs.metadata.getColumnDefinition(col);
ReadCommand rc = Util.cmd(cfs).filterOn(cdef.name.toString(), Operator.EQ, ((AbstractType) cdef.cellValueType()).decompose(val)).build();
- Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc);
+ Index.Searcher searcher = rc.index().searcherFor(rc);
if (count != 0)
assertNotNull(searcher);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 7f59e2f..02b642e 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -116,13 +116,14 @@ public class SinglePartitionSliceCommandTest
ByteBuffer zero = ByteBufferUtil.bytes(0);
Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero)));
ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false);
- ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
- FBUtilities.nowInSeconds(),
- columnFilter,
- RowFilter.NONE,
- DataLimits.NONE,
- key,
- sliceFilter);
+ ReadCommand cmd = SinglePartitionReadCommand.create(true,
+ cfm,
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ RowFilter.NONE,
+ DataLimits.NONE,
+ key,
+ sliceFilter);
DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21));
ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21);
@@ -166,13 +167,14 @@ public class SinglePartitionSliceCommandTest
ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
- ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
- FBUtilities.nowInSeconds(),
- columnFilter,
- RowFilter.NONE,
- DataLimits.NONE,
- key,
- sliceFilter);
+ ReadCommand cmd = SinglePartitionReadCommand.create(true,
+ cfm,
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ RowFilter.NONE,
+ DataLimits.NONE,
+ key,
+ sliceFilter);
// check raw iterator for static cell
try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
@@ -224,13 +226,14 @@ public class SinglePartitionSliceCommandTest
ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
Slice slice = Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes("i1")));
ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false);
- ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
- FBUtilities.nowInSeconds(),
- columnFilter,
- RowFilter.NONE,
- DataLimits.NONE,
- key,
- sliceFilter);
+ ReadCommand cmd = SinglePartitionReadCommand.create(true,
+ cfm,
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ RowFilter.NONE,
+ DataLimits.NONE,
+ key,
+ sliceFilter);
String ret = cmd.toCQLString();
Assert.assertNotNull(ret);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 640b68b..c2598ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -604,7 +604,7 @@ public class SSTableReaderTest
.columns("birthdate")
.filterOn("birthdate", Operator.EQ, 1L)
.build();
- Index.Searcher searcher = indexedCFS.indexManager.getBestIndexFor(rc).searcherFor(rc);
+ Index.Searcher searcher = rc.index().searcherFor(rc);
assertNotNull(searcher);
try (ReadOrderGroup orderGroup = ReadOrderGroup.forCommand(rc))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org