You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/27 15:50:19 UTC
[2/6] cassandra git commit: Sends the proper amount of cells to old
nodes on DISTINCT
Sends the proper amount of cells to old nodes on DISTINCT
patch by slebresne; reviewed by blerer for CASSANDRA-10762
On a DISTINCT query, 3.0 nodes were sending the 1 row back, but pre-3.0
nodes actually expect only the 1st cell and limits get thrown off if
they get more. This could actually be a problem for thrift queries
(on CQL tables only) when the limit ended up in the middle of a row.
The patch fixes this by enforcing the cell limit while serializing the
response to old nodes.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e37b4a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e37b4a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e37b4a9
Branch: refs/heads/cassandra-3.3
Commit: 3e37b4a90d4e5a036f24ac3d9a3aa804df6e6969
Parents: eb12770
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 20 17:31:10 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 27 15:45:27 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/LegacyLayout.java | 53 ++++++++-
.../org/apache/cassandra/db/ReadCommand.java | 54 +++++----
.../cassandra/db/ReadCommandVerbHandler.java | 2 +-
.../org/apache/cassandra/db/ReadResponse.java | 110 ++++++++-----------
.../apache/cassandra/db/filter/DataLimits.java | 16 ++-
.../db/partitions/PartitionUpdate.java | 4 +-
.../UnfilteredPartitionIterators.java | 6 +-
.../db/rows/UnfilteredRowIterators.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 6 +-
.../cassandra/service/DigestResolver.java | 6 +-
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../cassandra/cache/CacheProviderTest.java | 4 +-
.../org/apache/cassandra/db/PartitionTest.java | 18 +--
.../apache/cassandra/db/ReadResponseTest.java | 10 +-
.../db/SinglePartitionSliceCommandTest.java | 14 +--
.../rows/DigestBackwardCompatibilityTest.java | 5 +-
.../cassandra/service/DataResolverTest.java | 2 +-
19 files changed, 191 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c99438f..8daeb2d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.3
+ * Fix DISTINCT queries in mixed version clusters (CASSANDRA-10762)
* Migrate build status for indexes along with legacy schema (CASSANDRA-11046)
* Ensure SSTables for legacy KEYS indexes can be read (CASSANDRA-11045)
* Added support for IBM zSystems architecture (CASSANDRA-11054)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 6121227..b90151e 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -32,6 +32,7 @@ import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.context.CounterContext;
@@ -318,8 +319,46 @@ public abstract class LegacyLayout
return CompositeType.build(values);
}
+ /**
+ * The maximum number of cells to include per partition when converting to the old format.
+ * <p>
+ * We already apply the limit during the actual query, but for queries that counts cells and not rows (thrift queries
+ * and distinct queries as far as old nodes are concerned), we may still include a little bit more than requested
+ * because {@link DataLimits} always include full rows. So if the limit ends in the middle of a queried row, the
+ * full row will be part of our result. This would confuse old nodes however so we make sure to truncate it to
+ * what's expected before writting it on the wire.
+ *
+ * @param command the read commmand for which to determine the maximum cells per partition. This can be {@code null}
+ * in which case {@code Integer.MAX_VALUE} is returned.
+ * @return the maximum number of cells per partition that should be enforced according to the read command if
+ * post-query limitation are in order (see above). This will be {@code Integer.MAX_VALUE} if no such limits are
+ * necessary.
+ */
+ private static int maxCellsPerPartition(ReadCommand command)
+ {
+ if (command == null)
+ return Integer.MAX_VALUE;
+
+ DataLimits limits = command.limits();
+
+ // There is 2 types of DISTINCT queries: those that includes only the partition key, and those that include static columns.
+ // On old nodes, the latter expects the first row in term of CQL count, which is what we already have and there is no additional
+ // limit to apply. The former however expect only one cell per partition and rely on it (See CASSANDRA-10762).
+ if (limits.isDistinct())
+ return command.columnFilter().fetchedColumns().statics.isEmpty() ? 1 : Integer.MAX_VALUE;
+
+ switch (limits.kind())
+ {
+ case THRIFT_LIMIT:
+ case SUPER_COLUMN_COUNTING_LIMIT:
+ return limits.perPartitionCount();
+ default:
+ return Integer.MAX_VALUE;
+ }
+ }
+
// For serializing to old wire format
- public static LegacyUnfilteredPartition fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
+ public static LegacyUnfilteredPartition fromUnfilteredRowIterator(ReadCommand command, UnfilteredRowIterator iterator)
{
// we need to extract the range tombstone so materialize the partition. Since this is
// used for the on-wire format, this is not worst than it used to be.
@@ -333,6 +372,10 @@ public abstract class LegacyLayout
// before we use the LegacyRangeTombstoneList at all
List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right);
+ int maxCellsPerPartition = maxCellsPerPartition(command);
+ if (cells.size() > maxCellsPerPartition)
+ cells = cells.subList(0, maxCellsPerPartition);
+
// The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex
// deletions. Go through our normal range tombstones and add then to the LegacyRTL so that the range
// tombstones all get merged and sorted properly.
@@ -352,13 +395,13 @@ public abstract class LegacyLayout
return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells);
}
- public static void serializeAsLegacyPartition(UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
+ public static void serializeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
{
assert version < MessagingService.VERSION_30;
out.writeBoolean(true);
- LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+ LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition);
UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version);
DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out);
@@ -420,7 +463,7 @@ public abstract class LegacyLayout
}
// For the old wire format
- public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator partition, int version)
+ public static long serializedSizeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, int version)
{
assert version < MessagingService.VERSION_30;
@@ -429,7 +472,7 @@ public abstract class LegacyLayout
long size = TypeSizes.sizeof(true);
- LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+ LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition);
size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version);
size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 668a189..f21d100 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -282,11 +282,11 @@ public abstract class ReadCommand implements ReadQuery
protected abstract int oldestUnrepairedTombstone();
- public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
+ public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
{
return isDigestQuery()
- ? ReadResponse.createDigestResponse(iterator, digestVersion)
- : ReadResponse.createDataResponse(iterator, selection);
+ ? ReadResponse.createDigestResponse(iterator, this)
+ : ReadResponse.createDataResponse(iterator, this);
}
public long indexSerializedSize(int version)
@@ -723,18 +723,17 @@ public abstract class ReadCommand implements ReadQuery
out.writeBoolean(filter.isReversed());
// limit
- DataLimits.Kind kind = rangeCommand.limits().kind();
- boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
- if (isDistinct)
+ DataLimits limits = rangeCommand.limits();
+ if (limits.isDistinct())
out.writeInt(1);
else
out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
int compositesToGroup;
boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- if (kind == DataLimits.Kind.THRIFT_LIMIT)
+ if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
compositesToGroup = -1;
- else if (isDistinct && !selectsStatics)
+ else if (limits.isDistinct() && !selectsStatics)
compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
else
compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
@@ -799,11 +798,15 @@ public abstract class ReadCommand implements ReadQuery
AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
int maxResults = in.readInt();
- in.readBoolean(); // countCQL3Rows (not needed)
+ boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed)
in.readBoolean(); // isPaging (not needed)
boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
- boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+ // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+ // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
+ // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
+ // that fact.
+ boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
DataLimits limits;
if (isDistinct)
limits = DataLimits.distinctLimits(maxResults);
@@ -1054,9 +1057,13 @@ public abstract class ReadCommand implements ReadQuery
RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
int maxResults = in.readInt();
- in.readBoolean(); // countCQL3Rows
+ boolean countCQL3Rows = in.readBoolean();
- boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+ // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+ // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
+ // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
+ // that fact.
+ boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
DataLimits limits;
if (isDistinct)
limits = DataLimits.distinctLimits(maxResults);
@@ -1340,17 +1347,16 @@ public abstract class ReadCommand implements ReadQuery
out.writeBoolean(filter.isReversed());
boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING);
- DataLimits.Kind kind = command.limits().kind();
- boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1;
- if (isDistinct)
+ DataLimits limits = command.limits();
+ if (limits.isDistinct())
out.writeInt(1); // the limit is always 1 for DISTINCT queries
else
out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices()));
int compositesToGroup;
- if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
+ if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
compositesToGroup = -1;
- else if (isDistinct && !selectsStatics)
+ else if (limits.isDistinct() && !selectsStatics)
compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
else
compositesToGroup = metadata.clusteringColumns().size();
@@ -1369,9 +1375,19 @@ public abstract class ReadCommand implements ReadQuery
// if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
ColumnFilter columnFilter = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata);
- boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics);
+ // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+ // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter is probablematic
+ // however as we have no way to distinguish it from a normal select with a limit of 1 (and this, contrarily to the range query case
+ // were the countCQL3Rows boolean allows us to decide).
+ // So we consider this case not distinct here. This is ok because even if it is a distinct (with static), the count will be 1 and
+ // we'll still just query one row (a distinct DataLimits currently behave exactly like a CQL limit with a count of 1). The only
+ // drawback is that we'll send back the first row entirely while a 2.1/2.2 node would return only the first cell in that same
+ // situation. This isn't a problem for 2.1/2.2 code however (it would be for a range query, as it would throw off the count for
+ // reasons similar to CASSANDRA-10762, but it's ok for single partition queries).
+ // We do _not_ want to do the reverse however and consider a 'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make
+ // us only return the 1st cell rather then 1st row.
DataLimits limits;
- if (compositesToGroup == -2 || isDistinct)
+ if (compositesToGroup == -2)
limits = DataLimits.distinctLimits(count); // See CASSANDRA-8490 for the explanation of this value
else if (compositesToGroup == -1)
limits = DataLimits.thriftLimits(1, count);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 72a6fa8..9cde8dc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
ReadResponse response;
try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
{
- response = command.createResponse(iterator, command.columnFilter());
+ response = command.createResponse(iterator);
}
MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 41f0d5d..a618aa5 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -53,38 +53,38 @@ public abstract class ReadResponse
// This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
// hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough".
- private final CFMetaData metadata;
+ private final ReadCommand command;
- protected ReadResponse(CFMetaData metadata)
+ protected ReadResponse(ReadCommand command)
{
- this.metadata = metadata;
+ this.command = command;
}
- public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
+ public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
- return new LocalDataResponse(data, selection);
+ return new LocalDataResponse(data, command);
}
@VisibleForTesting
- public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
+ public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
- return new RemoteDataResponse(LocalDataResponse.build(data, selection));
+ return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()));
}
- public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, int version)
+ public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
- return new DigestResponse(makeDigest(data, version));
+ return new DigestResponse(makeDigest(data, command));
}
- public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
- public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
+ public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
+ public abstract ByteBuffer digest(ReadCommand command);
public abstract boolean isDigestResponse();
- protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, int version)
+ protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
{
MessageDigest digest = FBUtilities.threadLocalMD5Digest();
- UnfilteredPartitionIterators.digest(iterator, digest, version);
+ UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion());
return ByteBuffer.wrap(digest.digest());
}
@@ -99,12 +99,12 @@ public abstract class ReadResponse
this.digest = digest;
}
- public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
+ public UnfilteredPartitionIterator makeIterator(ReadCommand command)
{
throw new UnsupportedOperationException();
}
- public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+ public ByteBuffer digest(ReadCommand command)
{
// We assume that the digest is in the proper version, which bug excluded should be true since this is called with
// ReadCommand.digestVersion() as argument and that's also what we use to produce the digest in the first place.
@@ -122,11 +122,9 @@ public abstract class ReadResponse
// built on the owning node responding to a query
private static class LocalDataResponse extends DataResponse
{
- private final ColumnFilter received;
- private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter received)
+ private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
{
- super(iter.metadata(), build(iter, received), SerializationHelper.Flag.LOCAL);
- this.received = received;
+ super(command, build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL);
}
private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@ -142,14 +140,6 @@ public abstract class ReadResponse
throw new RuntimeException(e);
}
}
-
- protected ColumnFilter selection(ReadCommand sent)
- {
- // we didn't send anything, so we don't provide it in the serializer methods, but use the
- // object's reference to the original column filter we received
- assert sent == null || sent.columnFilter() == received;
- return received;
- }
}
// built on the coordinator node receiving a response
@@ -159,13 +149,6 @@ public abstract class ReadResponse
{
super(null, data, SerializationHelper.Flag.FROM_REMOTE);
}
-
- protected ColumnFilter selection(ReadCommand sent)
- {
- // we should always know what we sent, and should provide it in digest() and makeIterator()
- assert sent != null;
- return sent.columnFilter();
- }
}
static abstract class DataResponse extends ReadResponse
@@ -175,23 +158,24 @@ public abstract class ReadResponse
private final ByteBuffer data;
private final SerializationHelper.Flag flag;
- protected DataResponse(CFMetaData metadata, ByteBuffer data, SerializationHelper.Flag flag)
+ protected DataResponse(ReadCommand command, ByteBuffer data, SerializationHelper.Flag flag)
{
- super(metadata);
+ super(command);
this.data = data;
this.flag = flag;
}
- protected abstract ColumnFilter selection(ReadCommand command);
-
- public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
+ public UnfilteredPartitionIterator makeIterator(ReadCommand command)
{
try (DataInputBuffer in = new DataInputBuffer(data, true))
{
+ // Note that the command parameter shadows the 'command' field and this is intended because
+ // the later can be null (for RemoteDataResponse as those are created in the serializers and
+ // those don't have easy access to the command). This is also why we need the command as parameter here.
return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
MessagingService.current_version,
- metadata,
- selection(command),
+ command.metadata(),
+ command.columnFilter(),
flag);
}
catch (IOException e)
@@ -201,11 +185,11 @@ public abstract class ReadResponse
}
}
- public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+ public ByteBuffer digest(ReadCommand command)
{
- try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
+ try (UnfilteredPartitionIterator iterator = makeIterator(command))
{
- return makeDigest(iterator, command.digestVersion());
+ return makeDigest(iterator, command);
}
}
@@ -229,11 +213,11 @@ public abstract class ReadResponse
@VisibleForTesting
LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions)
{
- super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata
+ super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the command
this.partitions = partitions;
}
- public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, final ReadCommand command)
+ public UnfilteredPartitionIterator makeIterator(final ReadCommand command)
{
// Due to a bug in the serialization of AbstractBounds, anything that isn't a Range is understood by pre-3.0 nodes
// as a Bound, which means IncludingExcludingBounds and ExcludingBounds responses may include keys they shouldn't.
@@ -271,7 +255,7 @@ public abstract class ReadResponse
public CFMetaData metadata()
{
- return metadata;
+ return command.metadata();
}
public boolean hasNext()
@@ -296,11 +280,11 @@ public abstract class ReadResponse
};
}
- public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+ public ByteBuffer digest(ReadCommand command)
{
- try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
+ try (UnfilteredPartitionIterator iterator = makeIterator(command))
{
- return makeDigest(iterator, command.digestVersion());
+ return makeDigest(iterator, command);
}
}
@@ -323,14 +307,14 @@ public abstract class ReadResponse
out.writeBoolean(isDigest);
if (!isDigest)
{
- assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
- try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+ assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+ try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
{
assert iter.hasNext();
try (UnfilteredRowIterator partition = iter.next())
{
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
- LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+ LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
}
assert !iter.hasNext();
}
@@ -397,14 +381,14 @@ public abstract class ReadResponse
+ TypeSizes.sizeof(isDigest);
if (!isDigest)
{
- assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
- try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+ assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+ try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
{
assert iter.hasNext();
try (UnfilteredRowIterator partition = iter.next())
{
size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
- size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+ size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
}
assert !iter.hasNext();
}
@@ -458,8 +442,8 @@ public abstract class ReadResponse
// determine the number of partitions upfront for serialization
int numPartitions = 0;
- assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
- try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+ assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+ try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
{
while (iterator.hasNext())
{
@@ -476,14 +460,14 @@ public abstract class ReadResponse
out.writeInt(numPartitions);
- try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+ try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
- LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+ LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
}
}
}
@@ -509,15 +493,15 @@ public abstract class ReadResponse
assert version < MessagingService.VERSION_30;
long size = TypeSizes.sizeof(0); // number of partitions
- assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
- try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+ assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+ try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
- size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+ size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 19f24ad..f6fdcdd 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -98,6 +98,7 @@ public abstract class DataLimits
public abstract Kind kind();
public abstract boolean isUnlimited();
+ public abstract boolean isDistinct();
public abstract DataLimits forPaging(int pageSize);
public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
@@ -232,8 +233,7 @@ public abstract class DataLimits
protected final int rowLimit;
protected final int perPartitionLimit;
- // Whether the query is a distinct query or not. This is currently not used by the code but prior experience
- // shows that keeping the information around is wise and might be useful in the future.
+ // Whether the query is a distinct query or not.
protected final boolean isDistinct;
private CQLLimits(int rowLimit)
@@ -268,9 +268,14 @@ public abstract class DataLimits
return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
}
+ public boolean isDistinct()
+ {
+ return isDistinct;
+ }
+
public DataLimits forPaging(int pageSize)
{
- return new CQLLimits(pageSize, perPartitionLimit);
+ return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
}
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
@@ -513,6 +518,11 @@ public abstract class DataLimits
return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
}
+ public boolean isDistinct()
+ {
+ return false;
+ }
+
public DataLimits forPaging(int pageSize)
{
// We don't support paging on thrift in general but do use paging under the hood for get_count. For
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f10b3b6..6331440 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -613,7 +613,7 @@ public class PartitionUpdate extends AbstractBTreePartition
if (version < MessagingService.VERSION_30)
{
- LegacyLayout.serializeAsLegacyPartition(iter, out, version);
+ LegacyLayout.serializeAsLegacyPartition(null, iter, out, version);
}
else
{
@@ -699,7 +699,7 @@ public class PartitionUpdate extends AbstractBTreePartition
try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
{
if (version < MessagingService.VERSION_30)
- return LegacyLayout.serializedSizeAsLegacyPartition(iter, version);
+ return LegacyLayout.serializedSizeAsLegacyPartition(null, iter, version);
return CFMetaData.serializer.serializedSize(update.metadata(), version)
+ UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index a3f7981..41b1424 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -238,11 +238,13 @@ public abstract class UnfilteredPartitionIterators
/**
* Digests the the provided iterator.
*
+ * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
+ * as this is only used when producing digest to be sent to legacy nodes.
* @param iterator the iterator to digest.
* @param digest the {@code MessageDigest} to use for the digest.
* @param version the messaging protocol to use when producing the digest.
*/
- public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
+ public static void digest(ReadCommand command, UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
{
try (UnfilteredPartitionIterator iter = iterator)
{
@@ -250,7 +252,7 @@ public abstract class UnfilteredPartitionIterators
{
try (UnfilteredRowIterator partition = iter.next())
{
- UnfilteredRowIterators.digest(partition, digest, version);
+ UnfilteredRowIterators.digest(command, partition, digest, version);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index ea929d7..9416896 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -102,15 +102,17 @@ public abstract class UnfilteredRowIterators
/**
* Digests the partition represented by the provided iterator.
*
+ * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
+ * as this is only used when producing digest to be sent to legacy nodes.
* @param iterator the iterator to digest.
* @param digest the {@code MessageDigest} to use for the digest.
* @param version the messaging protocol to use when producing the digest.
*/
- public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version)
+ public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version)
{
if (version < MessagingService.VERSION_30)
{
- LegacyLayout.fromUnfilteredRowIterator(iterator).digest(iterator.metadata(), digest);
+ LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest);
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 3db9761..217c9de 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -211,7 +211,7 @@ public class Validator implements Runnable
validated++;
// MerkleTree uses XOR internally, so we want lots of output bits here
CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
- UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version);
+ UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version);
// only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
return digest.count > 0
? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index f3858d7..1fe931f 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -50,7 +50,7 @@ public class DataResolver extends ResponseResolver
public PartitionIterator getData()
{
ReadResponse response = responses.iterator().next().payload;
- return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(), command), command.nowInSec());
+ return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
}
public PartitionIterator resolve()
@@ -63,7 +63,7 @@ public class DataResolver extends ResponseResolver
for (int i = 0; i < count; i++)
{
MessageIn<ReadResponse> msg = responses.get(i);
- iters.add(msg.payload.makeIterator(command.metadata(), command));
+ iters.add(msg.payload.makeIterator(command));
sources[i] = msg.from;
}
@@ -385,7 +385,7 @@ public class DataResolver extends ResponseResolver
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
assert resolver.responses.size() == 1;
- return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(), command), retryCommand);
+ return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 62b4538..4a918a3 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
public PartitionIterator getData()
{
assert isDataPresent();
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
}
/*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
{
ReadResponse response = message.payload;
- ByteBuffer newDigest = response.digest(command.metadata(), command);
+ ByteBuffer newDigest = response.digest(command);
if (digest == null)
digest = newDigest;
else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
if (logger.isTraceEnabled())
logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 89ac0bb..8fa2082 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1779,7 +1779,7 @@ public class StorageProxy implements StorageProxyMBean
{
try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
{
- handler.response(command.createResponse(iterator, command.columnFilter()));
+ handler.response(command.createResponse(iterator));
}
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index cd52d35..a4173d6 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -103,8 +103,8 @@ public class CacheProviderTest
{
MessageDigest d1 = MessageDigest.getInstance("MD5");
MessageDigest d2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version);
- UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version);
+ UnfilteredRowIterators.digest(null, ((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version);
+ UnfilteredRowIterators.digest(null, ((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version);
assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest()));
}
catch (NoSuchAlgorithmException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 623ff0e..7216ab7 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -138,21 +138,23 @@ public class PartitionTest
new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe();
- ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build());
- ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+ ReadCommand cmd1 = Util.cmd(cfs, "key1").build();
+ ReadCommand cmd2 = Util.cmd(cfs, "key2").build();
+ ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(cmd1);
+ ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(cmd2);
MessageDigest digest1 = MessageDigest.getInstance("MD5");
MessageDigest digest2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
- UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+ UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version);
+ UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version);
assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
digest1 = MessageDigest.getInstance("MD5");
digest2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
- UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+ UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version);
+ UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version);
assertTrue(Arrays.equals(digest1.digest(), digest2.digest()));
p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
@@ -160,8 +162,8 @@ public class PartitionTest
p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
digest1 = MessageDigest.getInstance("MD5");
digest2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
- UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+ UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version);
+ UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version);
assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/ReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
index af0ec60..52ab8bb 100644
--- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -69,12 +69,12 @@ public class ReadResponseTest extends CQLTester
makePartition(cfs.metadata, "k3"));
ReadResponse.LegacyRemoteDataResponse response = new ReadResponse.LegacyRemoteDataResponse(responses);
- assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2");
- assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2");
- assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3");
+ assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2");
+ assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2");
+ assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3");
- assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2");
- assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3");
+ assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2");
+ assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3");
}
private void assertPartitions(UnfilteredPartitionIterator actual, String... expectedKeys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 7cacb5e..9af6028 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -111,14 +111,14 @@ public class SinglePartitionSliceCommandTest
logger.debug("ReadCommand: {}", cmd);
UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup());
- ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd.columnFilter());
+ ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd);
logger.debug("creating response: {}", response);
- partitionIterator = response.makeIterator(cfm, null); // <- cmd is null
+ partitionIterator = response.makeIterator(cmd);
assert partitionIterator.hasNext();
UnfilteredRowIterator partition = partitionIterator.next();
- LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(partition);
+ LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition);
Assert.assertEquals(Collections.emptyList(), rowIter.cells);
}
@@ -168,14 +168,14 @@ public class SinglePartitionSliceCommandTest
// check (de)serialized iterator for memtable static cell
try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
{
- response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+ response = ReadResponse.createDataResponse(pi, cmd);
}
out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
in = new DataInputBuffer(out.buffer(), true);
dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
- try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+ try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
{
checkForS(pi);
}
@@ -184,13 +184,13 @@ public class SinglePartitionSliceCommandTest
Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
{
- response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+ response = ReadResponse.createDataResponse(pi, cmd);
}
out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
in = new DataInputBuffer(out.buffer(), true);
dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
- try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+ try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
{
checkForS(pi);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
index 5503cfb..c8f5cb1 100644
--- a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
@@ -59,9 +59,10 @@ public class DigestBackwardCompatibilityTest extends CQLTester
* return ColumnFamily.digest(partition);
*/
- ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build());
+ ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build();
+ ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(cmd);
MessageDigest digest = FBUtilities.threadLocalMD5Digest();
- UnfilteredRowIterators.digest(partition.unfilteredIterator(), digest, MessagingService.VERSION_22);
+ UnfilteredRowIterators.digest(cmd, partition.unfilteredIterator(), digest, MessagingService.VERSION_22);
return ByteBuffer.wrap(digest.digest());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index ecffbbd..997f4e4 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -712,7 +712,7 @@ public class DataResolverTest
public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
{
return MessageIn.create(from,
- ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()),
+ ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
Collections.EMPTY_MAP,
MessagingService.Verb.REQUEST_RESPONSE,
MessagingService.current_version);