You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/08/08 00:44:41 UTC
[2/4] cassandra git commit: On-wire backward compatibility for 3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index c3f036a..913a1de 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,18 +18,24 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.util.Iterator;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -37,7 +43,10 @@ import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
/**
* General interface for storage-engine read commands (common to both range and
@@ -51,6 +60,10 @@ public abstract class ReadCommand implements ReadQuery
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+ public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
+ public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
+ public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
+
private final Kind kind;
private final CFMetaData metadata;
private final int nowInSec;
@@ -72,9 +85,9 @@ public abstract class ReadCommand implements ReadQuery
SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer);
- private SelectionDeserializer selectionDeserializer;
+ private final SelectionDeserializer selectionDeserializer;
- private Kind(SelectionDeserializer selectionDeserializer)
+ Kind(SelectionDeserializer selectionDeserializer)
{
this.selectionDeserializer = selectionDeserializer;
}
@@ -251,8 +264,6 @@ public abstract class ReadCommand implements ReadQuery
/**
* Executes this command on the local host.
*
- * @param cfs the store for the table queried by this command.
- *
* @return an iterator over the result of executing this command locally.
*/
@SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
@@ -281,7 +292,7 @@ public abstract class ReadCommand implements ReadQuery
// we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
// would be more efficient (the sooner we discard stuff we know we don't care, the less useless
// processing we do on it).
- return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec());
+ return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
}
catch (RuntimeException | Error e)
{
@@ -389,7 +400,7 @@ public abstract class ReadCommand implements ReadQuery
logger.warn(msg);
}
- Tracing.trace("Read {} live and {} tombstone cells{}", new Object[]{ liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "") });
+ Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
}
}
};
@@ -398,12 +409,16 @@ public abstract class ReadCommand implements ReadQuery
/**
* Creates a message for this command.
*/
- public MessageOut<ReadCommand> createMessage()
+ public MessageOut<ReadCommand> createMessage(int version)
{
- // TODO: we should use different verbs for old message (RANGE_SLICE, PAGED_RANGE)
- return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
+ if (version >= MessagingService.VERSION_30)
+ return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
+
+ return createLegacyMessage();
}
+ protected abstract MessageOut<ReadCommand> createLegacyMessage();
+
protected abstract void appendCQLWhereClause(StringBuilder sb);
// Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
@@ -433,11 +448,11 @@ public abstract class ReadCommand implements ReadQuery
{
StringBuilder sb = new StringBuilder();
sb.append("SELECT ").append(columnFilter());
- sb.append(" FROM ").append(metadata().ksName).append(".").append(metadata.cfName);
+ sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
appendCQLWhereClause(sb);
if (limits() != DataLimits.NONE)
- sb.append(" ").append(limits());
+ sb.append(' ').append(limits());
return sb.toString();
}
@@ -465,8 +480,8 @@ public abstract class ReadCommand implements ReadQuery
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
- if (version < MessagingService.VERSION_30)
- throw new UnsupportedOperationException();
+ // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
+ assert version >= MessagingService.VERSION_30;
out.writeByte(command.kind.ordinal());
out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
@@ -482,7 +497,7 @@ public abstract class ReadCommand implements ReadQuery
public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
{
if (version < MessagingService.VERSION_30)
- throw new UnsupportedOperationException();
+ return legacyReadCommandSerializer.deserialize(in, version);
Kind kind = Kind.values()[in.readByte()];
int flags = in.readByte();
@@ -499,8 +514,8 @@ public abstract class ReadCommand implements ReadQuery
public long serializedSize(ReadCommand command, int version)
{
- if (version < MessagingService.VERSION_30)
- throw new UnsupportedOperationException();
+ // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
+ assert version >= MessagingService.VERSION_30;
return 2 // kind + flags
+ CFMetaData.serializer.serializedSize(command.metadata(), version)
@@ -511,4 +526,950 @@ public abstract class ReadCommand implements ReadQuery
+ command.selectionSerializedSize(version);
}
}
+
+ private enum LegacyType
+ {
+ GET_BY_NAMES((byte)1),
+ GET_SLICES((byte)2);
+
+ public final byte serializedValue;
+
+ LegacyType(byte b)
+ {
+ this.serializedValue = b;
+ }
+
+ public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
+ {
+ return kind == ClusteringIndexFilter.Kind.SLICE
+ ? GET_SLICES
+ : GET_BY_NAMES;
+ }
+
+ public static LegacyType fromSerializedValue(byte b)
+ {
+ return b == 1 ? GET_BY_NAMES : GET_SLICES;
+ }
+ }
+
+ /**
+ * Serializer for pre-3.0 RangeSliceCommands.
+ */
+ private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
+ {
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ assert !rangeCommand.dataRange().isPaging();
+
+ // convert pre-3.0 incompatible names filters to slice filters
+ rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+
+ CFMetaData metadata = rangeCommand.metadata();
+
+ out.writeUTF(metadata.ksName);
+ out.writeUTF(metadata.cfName);
+ out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
+
+ // begin DiskAtomFilterSerializer.serialize()
+ if (rangeCommand.isNamesQuery())
+ {
+ out.writeByte(1); // 0 for slices, 1 for names
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
+ }
+ else
+ {
+ out.writeByte(0); // 0 for slices, 1 for names
+
+ // slice filter serialization
+ ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
+
+ out.writeBoolean(filter.isReversed());
+
+ // limit
+ DataLimits.Kind kind = rangeCommand.limits().kind();
+ boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
+ if (isDistinct)
+ out.writeInt(1);
+ else
+ out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
+
+ int compositesToGroup;
+ boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ if (kind == DataLimits.Kind.THRIFT_LIMIT)
+ compositesToGroup = -1;
+ else if (isDistinct && !selectsStatics)
+ compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
+ else
+ compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
+
+ out.writeInt(compositesToGroup);
+ }
+
+ serializeRowFilter(out, rangeCommand.rowFilter());
+ AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
+
+ // maxResults
+ out.writeInt(rangeCommand.limits().count());
+
+ // countCQL3Rows
+ if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT
+ out.writeBoolean(false);
+ else
+ out.writeBoolean(true);
+
+ // isPaging
+ out.writeBoolean(false);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ if (metadata == null)
+ {
+ String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
+ throw new UnknownColumnFamilyException(message, null);
+ }
+
+ int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
+
+ ClusteringIndexFilter filter;
+ ColumnFilter selection;
+ int compositesToGroup = 0;
+ int perPartitionLimit = -1;
+ byte readType = in.readByte(); // 0 for slices, 1 for names
+ if (readType == 1)
+ {
+ Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
+ selection = selectionAndFilter.left;
+ filter = selectionAndFilter.right;
+ }
+ else
+ {
+ filter = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
+ perPartitionLimit = in.readInt();
+ compositesToGroup = in.readInt();
+ selection = getColumnSelectionForSlice((ClusteringIndexSliceFilter) filter, compositesToGroup, metadata);
+ }
+
+ RowFilter rowFilter = deserializeRowFilter(in, metadata);
+
+ AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
+ int maxResults = in.readInt();
+
+ in.readBoolean(); // countCQL3Rows (not needed)
+ in.readBoolean(); // isPaging (not needed)
+
+ boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
+ boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+ DataLimits limits;
+ if (isDistinct)
+ limits = DataLimits.distinctLimits(maxResults);
+ else if (compositesToGroup == -1)
+ limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
+ else
+ limits = DataLimits.cqlLimits(maxResults);
+
+ return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+ }
+
+ static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
+ {
+ ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
+ out.writeInt(indexExpressions.size());
+ for (RowFilter.Expression expression : indexExpressions)
+ {
+ ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
+ expression.operator().writeTo(out);
+ ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
+ }
+ }
+
+ static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ int numRowFilters = in.readInt();
+ if (numRowFilters == 0)
+ return RowFilter.NONE;
+
+ RowFilter rowFilter = RowFilter.create(numRowFilters);
+ for (int i = 0; i < numRowFilters; i++)
+ {
+ ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
+ ColumnDefinition column = metadata.getColumnDefinition(columnName);
+ Operator op = Operator.readFrom(in);
+ ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
+ rowFilter.add(column, op, indexValue);
+ }
+ return rowFilter;
+ }
+
+ static long serializedRowFilterSize(RowFilter rowFilter)
+ {
+ long size = TypeSizes.sizeof(0); // rowFilterCount
+ for (RowFilter.Expression expression : rowFilter)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+ size += TypeSizes.sizeof(0); // operator int value
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+ }
+ return size;
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ assert version < MessagingService.VERSION_30;
+ assert command.kind == Kind.PARTITION_RANGE;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+ CFMetaData metadata = rangeCommand.metadata();
+
+ long size = TypeSizes.sizeof(metadata.ksName);
+ size += TypeSizes.sizeof(metadata.cfName);
+ size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
+
+ size += 1; // single byte flag: 0 for slices, 1 for names
+ if (rangeCommand.isNamesQuery())
+ {
+ PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
+ }
+ else
+ {
+ ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
+ size += TypeSizes.sizeof(filter.isReversed());
+ size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
+ size += TypeSizes.sizeof(0); // compositesToGroup
+ }
+
+ if (rangeCommand.rowFilter().equals(RowFilter.NONE))
+ {
+ size += TypeSizes.sizeof(0);
+ }
+ else
+ {
+ ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
+ size += TypeSizes.sizeof(indexExpressions.size());
+ for (RowFilter.Expression expression : indexExpressions)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+ size += TypeSizes.sizeof(expression.operator().ordinal());
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+ }
+ }
+
+ size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
+ size += TypeSizes.sizeof(rangeCommand.limits().count());
+ size += TypeSizes.sizeof(!rangeCommand.isForThrift());
+ return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
+ }
+
+ static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
+ {
+ if (!command.dataRange().isNamesQuery())
+ return command;
+
+ CFMetaData metadata = command.metadata();
+ if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
+ return command;
+
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
+ ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
+ DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
+ return new PartitionRangeReadCommand(
+ command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(),
+ command.columnFilter(), command.rowFilter(), command.limits(), newRange);
+ }
+
+ static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata)
+ {
+ // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
+ if (compositesToGroup == -2)
+ return ColumnFilter.selection(PartitionColumns.NONE);
+
+ // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
+ PartitionColumns columns = filter.selects(Clustering.STATIC_CLUSTERING)
+ ? metadata.partitionColumns()
+ : metadata.partitionColumns().withoutStatics();
+ return new ColumnFilter.Builder(metadata).addAll(columns).build();
+ }
+ }
+
+ /**
+ * Serializer for pre-3.0 PagedRangeCommands.
+ */
+ private static class LegacyPagedRangeCommandSerializer implements IVersionedSerializer<ReadCommand>
+ {
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ assert rangeCommand.dataRange().isPaging();
+
+ CFMetaData metadata = rangeCommand.metadata();
+
+ out.writeUTF(metadata.ksName);
+ out.writeUTF(metadata.cfName);
+ out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
+
+ AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
+
+ // pre-3.0 nodes don't accept names filters for paged range commands
+ ClusteringIndexSliceFilter filter;
+ if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES)
+ filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata);
+ else
+ filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+ // slice filter
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
+ out.writeBoolean(filter.isReversed());
+
+ // slice filter's count
+ DataLimits.Kind kind = rangeCommand.limits().kind();
+ boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
+ if (isDistinct)
+ out.writeInt(1);
+ else
+ out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().perPartitionCount(), filter.requestedSlices()));
+
+ // compositesToGroup
+ boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ int compositesToGroup;
+ if (kind == DataLimits.Kind.THRIFT_LIMIT)
+ compositesToGroup = -1;
+ else if (isDistinct && !selectsStatics)
+ compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
+ else
+ compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
+
+ out.writeInt(compositesToGroup);
+
+ // command-level "start" and "stop" composites. The start is the last-returned cell name if there is one,
+ // otherwise it's the same as the slice filter's start. The stop appears to always be the same as the
+ // slice filter's stop.
+ DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange();
+ Clustering lastReturned = pagingRange.getLastReturned();
+ Slice.Bound newStart = Slice.Bound.exclusiveStartOf(lastReturned);
+ Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1);
+ ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeBound(metadata, newStart, true), out);
+ ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()), out);
+
+ LegacyRangeSliceCommandSerializer.serializeRowFilter(out, rangeCommand.rowFilter());
+
+ // command-level limit
+ // Pre-3.0 we would always request one more row than we actually needed and the command-level "start" would
+ // be the last-returned cell name, so the response would always include it. When dealing with compound comparators,
+ // we can pass an exclusive start and use the normal limit. However, when dealing with non-compound comparators,
+ // pre-3.0 nodes cannot perform exclusive slices, so we need to request one extra row.
+ int maxResults = rangeCommand.limits().count() + (metadata.isCompound() ? 0 : 1);
+ out.writeInt(maxResults);
+
+ // countCQL3Rows
+ if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // for Thrift or DISTINCT
+ out.writeBoolean(false);
+ else
+ out.writeBoolean(true);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ if (metadata == null)
+ {
+ String message = String.format("Got legacy paged range command for nonexistent table %s.%s.", keyspace, columnFamily);
+ throw new UnknownColumnFamilyException(message, null);
+ }
+
+ int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
+ AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
+
+ ClusteringIndexSliceFilter filter = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
+ int perPartitionLimit = in.readInt();
+ int compositesToGroup = in.readInt();
+
+ // command-level Composite "start" and "stop"
+ LegacyLayout.LegacyBound startBound = LegacyLayout.decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
+ ByteBufferUtil.readWithShortLength(in); // the composite "stop", which isn't actually needed
+
+ // pre-3.0 nodes will sometimes use a clustering prefix for the Command-level start and stop, but in all
+ // cases this should also be represented by the ClusteringIndexFilter, so we can ignore them
+ Clustering startClustering;
+ if (startBound == LegacyLayout.LegacyBound.BOTTOM || startBound.bound.size() < metadata.comparator.size())
+ startClustering = Clustering.EMPTY;
+ else
+ startClustering = startBound.getAsClustering(metadata);
+
+ ColumnFilter selection = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(filter, compositesToGroup, metadata);
+
+ RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
+ int maxResults = in.readInt();
+ in.readBoolean(); // countCQL3Rows
+
+
+ boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
+ boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+ DataLimits limits;
+ if (isDistinct)
+ limits = DataLimits.distinctLimits(maxResults);
+ else if (compositesToGroup == -1)
+ limits = DataLimits.thriftLimits(1, perPartitionLimit); // we only use paging w/ thrift for get_count(), so partition limit must be 1
+ else
+ limits = DataLimits.cqlLimits(maxResults);
+
+ limits = limits.forPaging(maxResults);
+
+ // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it
+ // missing without any problems, so we can safely always set "inclusive" to false in the data range
+ DataRange dataRange = new DataRange(keyRange, filter).forPaging(keyRange, metadata.comparator, startClustering, false);
+ return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ assert version < MessagingService.VERSION_30;
+ assert command.kind == Kind.PARTITION_RANGE;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ CFMetaData metadata = rangeCommand.metadata();
+ assert rangeCommand.dataRange().isPaging();
+
+ long size = TypeSizes.sizeof(metadata.ksName);
+ size += TypeSizes.sizeof(metadata.cfName);
+ size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
+
+ size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
+
+ // pre-3.0 nodes only accept slice filters for paged range commands
+ ClusteringIndexSliceFilter filter;
+ if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES)
+ filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata);
+ else
+ filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+ // slice filter
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
+ size += TypeSizes.sizeof(filter.isReversed());
+
+ // slice filter's count
+ size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
+
+ // compositesToGroup
+ size += TypeSizes.sizeof(0);
+
+ // command-level Composite "start" and "stop"
+ DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange();
+ Clustering lastReturned = pagingRange.getLastReturned();
+ Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1);
+ size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastReturned));
+ size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()));
+
+ size += LegacyRangeSliceCommandSerializer.serializedRowFilterSize(rangeCommand.rowFilter());
+
+ // command-level limit
+ size += TypeSizes.sizeof(rangeCommand.limits().count());
+
+ // countCQL3Rows
+ return size + TypeSizes.sizeof(true);
+ }
+ }
+
+ /**
+ * Serializer for pre-3.0 ReadCommands.
+ */
+ static class LegacyReadCommandSerializer implements IVersionedSerializer<ReadCommand>
+ {
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+ assert command.kind == Kind.SINGLE_PARTITION;
+
+ SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command;
+ singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
+
+ CFMetaData metadata = singleReadCommand.metadata();
+
+ out.writeByte(LegacyType.fromPartitionFilterKind(singleReadCommand.clusteringIndexFilter().kind()).serializedValue);
+
+ out.writeBoolean(singleReadCommand.isDigestQuery());
+ out.writeUTF(metadata.ksName);
+ ByteBufferUtil.writeWithShortLength(singleReadCommand.partitionKey().getKey(), out);
+ out.writeUTF(metadata.cfName);
+ out.writeLong(singleReadCommand.nowInSec() * 1000L); // convert from seconds to millis
+
+ if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
+ serializeSliceCommand((SinglePartitionSliceCommand) singleReadCommand, out);
+ else
+ serializeNamesCommand((SinglePartitionNamesCommand) singleReadCommand, out);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+ LegacyType msgType = LegacyType.fromSerializedValue(in.readByte());
+
+ boolean isDigest = in.readBoolean();
+ String keyspaceName = in.readUTF();
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ String cfName = in.readUTF();
+ long nowInMillis = in.readLong();
+ int nowInSeconds = (int) (nowInMillis / 1000); // convert from millis to seconds
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
+ DecoratedKey dk = metadata.partitioner.decorateKey(key);
+
+ switch (msgType)
+ {
+ case GET_BY_NAMES:
+ return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds);
+ case GET_SLICES:
+ return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds);
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ assert version < MessagingService.VERSION_30;
+ assert command.kind == Kind.SINGLE_PARTITION;
+ SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command;
+ singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
+
+ int keySize = singleReadCommand.partitionKey().getKey().remaining();
+
+ CFMetaData metadata = singleReadCommand.metadata();
+
+ long size = 1; // message type (single byte)
+ size += TypeSizes.sizeof(command.isDigestQuery());
+ size += TypeSizes.sizeof(metadata.ksName);
+ size += TypeSizes.sizeof((short) keySize) + keySize;
+ size += TypeSizes.sizeof((long) command.nowInSec());
+
+ if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
+ return size + serializedSliceCommandSize((SinglePartitionSliceCommand) singleReadCommand);
+ else
+ return size + serializedNamesCommandSize((SinglePartitionNamesCommand) singleReadCommand);
+ }
+
+ private void serializeNamesCommand(SinglePartitionNamesCommand command, DataOutputPlus out) throws IOException
+ {
+ serializeNamesFilter(command, command.clusteringIndexFilter(), out);
+ }
+
+
+ private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException
+ {
+ PartitionColumns columns = command.columnFilter().fetchedColumns();
+ CFMetaData metadata = command.metadata();
+ SortedSet<Clustering> requestedRows = filter.requestedRows();
+
+ if (requestedRows.isEmpty())
+ {
+ // only static columns are requested
+ out.writeInt(columns.size());
+ for (ColumnDefinition column : columns)
+ ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
+ }
+ else
+ {
+ out.writeInt(requestedRows.size() * columns.size());
+ for (Clustering clustering : requestedRows)
+ {
+ for (ColumnDefinition column : columns)
+ ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null), out);
+ }
+ }
+
+ // countCql3Rows should be true if it's not for Thrift or a DISTINCT query
+ if (command.isForThrift() || (command.limits().kind() == DataLimits.Kind.CQL_LIMIT && command.limits().perPartitionCount() == 1))
+ out.writeBoolean(false); // it's compact and not a DISTINCT query
+ else
+ out.writeBoolean(true);
+ }
+
+ static long serializedNamesFilterSize(ClusteringIndexNamesFilter filter, CFMetaData metadata, PartitionColumns fetchedColumns)
+ {
+ SortedSet<Clustering> requestedRows = filter.requestedRows();
+
+ long size = 0;
+ if (requestedRows.isEmpty())
+ {
+ // only static columns are requested
+ size += TypeSizes.sizeof(fetchedColumns.size());
+ for (ColumnDefinition column : fetchedColumns)
+ size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
+ }
+ else
+ {
+ size += TypeSizes.sizeof(requestedRows.size() * fetchedColumns.size());
+ for (Clustering clustering : requestedRows)
+ {
+ for (ColumnDefinition column : fetchedColumns)
+ size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null));
+ }
+ }
+
+ return size + TypeSizes.sizeof(true); // countCql3Rows
+ }
+
+ private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException
+ {
+ Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
+
+ // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+ return new SinglePartitionNamesCommand(
+ isDigest, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
+ key, selectionAndFilter.right);
+ }
+
+ static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ int numCellNames = in.readInt();
+
+ // The names filter could include either a) static columns or b) normal columns with the clustering columns
+ // fully specified. We need to handle those cases differently in 3.0.
+ NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator);
+
+ ColumnFilter.Builder selectionBuilder = new ColumnFilter.Builder(metadata);
+ for (int i = 0; i < numCellNames; i++)
+ {
+ ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in);
+ LegacyLayout.LegacyCellName cellName;
+ try
+ {
+ cellName = LegacyLayout.decodeCellName(metadata, buffer);
+ }
+ catch (UnknownColumnException exc)
+ {
+ // TODO this probably needs a new exception class that shares a parent with UnknownColumnFamilyException
+ throw new UnknownColumnFamilyException(
+ "Received legacy range read command with names filter for unrecognized column name. " +
+ "Fill name in filter (hex): " + ByteBufferUtil.bytesToHex(buffer), metadata.cfId);
+ }
+
+ if (!cellName.clustering.equals(Clustering.STATIC_CLUSTERING))
+ clusterings.add(cellName.clustering);
+
+ selectionBuilder.add(cellName.column);
+ }
+
+ in.readBoolean(); // countCql3Rows
+
+ // clusterings cannot include STATIC_CLUSTERING, so if the names filter is for static columns, clusterings
+ // will be empty. However, by requesting the static columns in our ColumnFilter, this will still work.
+ ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false);
+ return Pair.create(selectionBuilder.build(), filter);
+ }
+
+ private long serializedNamesCommandSize(SinglePartitionNamesCommand command)
+ {
+ ClusteringIndexNamesFilter filter = command.clusteringIndexFilter();
+ PartitionColumns columns = command.columnFilter().fetchedColumns();
+ return serializedNamesFilterSize(filter, command.metadata(), columns);
+ }
+
+ private void serializeSliceCommand(SinglePartitionSliceCommand command, DataOutputPlus out) throws IOException
+ {
+ CFMetaData metadata = command.metadata();
+ ClusteringIndexSliceFilter filter = command.clusteringIndexFilter();
+
+ Slices slices = filter.requestedSlices();
+ boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
+ serializeSlices(out, slices, filter.isReversed(), makeStaticSlice, metadata);
+
+ out.writeBoolean(filter.isReversed());
+
+ boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING);
+ DataLimits.Kind kind = command.limits().kind();
+ boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1;
+ if (isDistinct)
+ out.writeInt(1); // the limit is always 1 for DISTINCT queries
+ else
+ out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices()));
+
+ int compositesToGroup;
+ if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
+ compositesToGroup = -1;
+ else if (isDistinct && !selectsStatics)
+ compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
+ else
+ compositesToGroup = metadata.clusteringColumns().size();
+
+ out.writeInt(compositesToGroup);
+ }
+
+ private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException
+ {
+ ClusteringIndexSliceFilter filter = deserializeSlicePartitionFilter(in, metadata);
+ int count = in.readInt();
+ int compositesToGroup = in.readInt();
+
+ // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
+ boolean selectsStatics = filter.selects(Clustering.STATIC_CLUSTERING);
+ PartitionColumns columns = selectsStatics
+ ? metadata.partitionColumns()
+ : metadata.partitionColumns().withoutStatics();
+ ColumnFilter columnFilter = new ColumnFilter.Builder(metadata).addAll(columns).build();
+
+ boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics);
+ DataLimits limits;
+ if (compositesToGroup == -2 || isDistinct)
+ limits = DataLimits.distinctLimits(count); // See CASSANDRA-8490 for the explanation of this value
+ else if (compositesToGroup == -1)
+ limits = DataLimits.thriftLimits(1, count);
+ else
+ limits = DataLimits.cqlLimits(count);
+
+ // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+ return new SinglePartitionSliceCommand(isDigest, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
+ }
+
+ private long serializedSliceCommandSize(SinglePartitionSliceCommand command)
+ {
+ CFMetaData metadata = command.metadata();
+ ClusteringIndexSliceFilter filter = command.clusteringIndexFilter();
+
+ Slices slices = filter.requestedSlices();
+ boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
+
+ long size = serializedSlicesSize(slices, makeStaticSlice, metadata);
+ size += TypeSizes.sizeof(command.clusteringIndexFilter().isReversed());
+ size += TypeSizes.sizeof(command.limits().count());
+ return size + TypeSizes.sizeof(0); // compositesToGroup
+ }
+
+ static void serializeSlices(DataOutputPlus out, Slices slices, boolean isReversed, boolean makeStaticSlice, CFMetaData metadata) throws IOException
+ {
+ out.writeInt(slices.size() + (makeStaticSlice ? 1 : 0));
+
+ // In 3.0 we always store the slices in normal comparator order. Pre-3.0 nodes expect the slices to
+ // be in reversed order if the query is reversed, so we handle that here.
+ if (isReversed)
+ {
+ for (int i = slices.size() - 1; i >= 0; i--)
+ serializeSlice(out, slices.get(i), true, metadata);
+ if (makeStaticSlice)
+ serializeStaticSlice(out, true, metadata);
+ }
+ else
+ {
+ if (makeStaticSlice)
+ serializeStaticSlice(out, false, metadata);
+ for (Slice slice : slices)
+ serializeSlice(out, slice, false, metadata);
+ }
+ }
+
+ static long serializedSlicesSize(Slices slices, boolean makeStaticSlice, CFMetaData metadata)
+ {
+ long size = TypeSizes.sizeof(slices.size());
+
+ for (Slice slice : slices)
+ {
+ ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, slice.start(), true);
+ size += ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
+ ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, slice.end(), false);
+ size += ByteBufferUtil.serializedSizeWithShortLength(sliceEnd);
+ }
+
+ if (makeStaticSlice)
+ size += serializedStaticSliceSize(metadata);
+
+ return size;
+ }
+
+ static long serializedStaticSliceSize(CFMetaData metadata)
+ {
+ // unlike serializeStaticSlice(), but we don't care about reversal for size calculations
+ ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false);
+ long size = ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
+
+ size += TypeSizes.sizeof((short) (metadata.comparator.size() * 3 + 2));
+ size += TypeSizes.sizeof((short) LegacyLayout.STATIC_PREFIX);
+ for (int i = 0; i < metadata.comparator.size(); i++)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ size += 1; // EOC
+ }
+ return size;
+ }
+
+ private static void serializeSlice(DataOutputPlus out, Slice slice, boolean isReversed, CFMetaData metadata) throws IOException
+ {
+ ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, isReversed ? slice.end() : slice.start(), !isReversed);
+ ByteBufferUtil.writeWithShortLength(sliceStart, out);
+
+ ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, isReversed ? slice.start() : slice.end(), isReversed);
+ ByteBufferUtil.writeWithShortLength(sliceEnd, out);
+ }
+
+ private static void serializeStaticSlice(DataOutputPlus out, boolean isReversed, CFMetaData metadata) throws IOException
+ {
+ // if reversed, write an empty bound for the slice start; if reversed, write out an empty bound for the
+ // slice finish after we've written the static slice start
+ if (!isReversed)
+ {
+ ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false);
+ ByteBufferUtil.writeWithShortLength(sliceStart, out);
+ }
+
+ // write out the length of the composite
+ out.writeShort(2 + metadata.comparator.size() * 3); // two bytes + EOC for each component, plus static prefix
+ out.writeShort(LegacyLayout.STATIC_PREFIX);
+ for (int i = 0; i < metadata.comparator.size(); i++)
+ {
+ ByteBufferUtil.writeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+ // write the EOC, using an inclusive end if we're on the final component
+ out.writeByte(i == metadata.comparator.size() - 1 ? 1 : 0);
+ }
+
+ if (isReversed)
+ {
+ ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false);
+ ByteBufferUtil.writeWithShortLength(sliceStart, out);
+ }
+ }
+
+ static ClusteringIndexSliceFilter deserializeSlicePartitionFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ int numSlices = in.readInt();
+ ByteBuffer[] startBuffers = new ByteBuffer[numSlices];
+ ByteBuffer[] finishBuffers = new ByteBuffer[numSlices];
+ for (int i = 0; i < numSlices; i++)
+ {
+ startBuffers[i] = ByteBufferUtil.readWithShortLength(in);
+ finishBuffers[i] = ByteBufferUtil.readWithShortLength(in);
+ }
+
+ // we have to know if the query is reversed before we can correctly build the slices
+ boolean reversed = in.readBoolean();
+
+ Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator);
+ for (int i = 0; i < numSlices; i++)
+ {
+ Slice.Bound start, finish;
+ if (!reversed)
+ {
+ start = LegacyLayout.decodeBound(metadata, startBuffers[i], true).bound;
+ finish = LegacyLayout.decodeBound(metadata, finishBuffers[i], false).bound;
+ }
+ else
+ {
+ // pre-3.0, reversed query slices put the greater element at the start of the slice
+ finish = LegacyLayout.decodeBound(metadata, startBuffers[i], false).bound;
+ start = LegacyLayout.decodeBound(metadata, finishBuffers[i], true).bound;
+ }
+ slicesBuilder.add(Slice.make(start, finish));
+ }
+
+ return new ClusteringIndexSliceFilter(slicesBuilder.build(), reversed);
+ }
+
+ private static SinglePartitionReadCommand maybeConvertNamesToSlice(SinglePartitionReadCommand command)
+ {
+ if (command.clusteringIndexFilter().kind() != ClusteringIndexFilter.Kind.NAMES)
+ return command;
+
+ CFMetaData metadata = command.metadata();
+
+ if (!shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
+ return command;
+
+ ClusteringIndexNamesFilter filter = ((SinglePartitionNamesCommand) command).clusteringIndexFilter();
+ ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
+ return new SinglePartitionSliceCommand(
+ command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(),
+ command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
+ }
+
+ /**
+ * Returns true if a names filter on the given table and column selection should be converted to a slice
+ * filter for compatibility with pre-3.0 nodes, false otherwise.
+ */
+ static boolean shouldConvertNamesToSlice(CFMetaData metadata, PartitionColumns columns)
+ {
+ // On pre-3.0 nodes, due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
+ if (!metadata.isDense() && metadata.isCompound())
+ return true;
+
+ // pre-3.0 nodes don't support names filters for reading collections, so if we're requesting any of those,
+ // we need to convert this to a slice filter
+ for (ColumnDefinition column : columns)
+ {
+ if (column.type.isMultiCell())
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Converts a names filter that is incompatible with pre-3.0 nodes to a slice filter that is compatible.
+ */
+ private static ClusteringIndexSliceFilter convertNamesFilterToSliceFilter(ClusteringIndexNamesFilter filter, CFMetaData metadata)
+ {
+ SortedSet<Clustering> requestedRows = filter.requestedRows();
+ Slices slices;
+ if (requestedRows.isEmpty() || requestedRows.size() == 1 && requestedRows.first().size() == 0)
+ {
+ slices = Slices.ALL;
+ }
+ else
+ {
+ Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator);
+ for (Clustering clustering : requestedRows)
+ slicesBuilder.add(Slice.Bound.inclusiveStartOf(clustering), Slice.Bound.inclusiveEndOf(clustering));
+ slices = slicesBuilder.build();
+ }
+
+ return new ClusteringIndexSliceFilter(slices, filter.isReversed());
+ }
+
+ /**
+ * Potentially increases the existing query limit to account for the lack of exclusive bounds in pre-3.0 nodes.
+ * @param limit the existing query limit
+ * @param slices the requested slices
+ * @return the updated limit
+ */
+ static int updateLimitForQuery(int limit, Slices slices)
+ {
+ // Pre-3.0 nodes don't support exclusive bounds for slices. Instead, we query one more element if necessary
+ // and filter it later (in LegacyRemoteDataResponse)
+ if (!slices.hasLowerBound() && ! slices.hasUpperBound())
+ return limit;
+
+ for (Slice slice : slices)
+ {
+ if (limit == Integer.MAX_VALUE)
+ return limit;
+
+ if (!slice.start().isInclusive())
+ limit++;
+ if (!slice.end().isInclusive())
+ limit++;
+ }
+ return limit;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index f85d406..9cde8dc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -17,8 +17,8 @@
*/
package org.apache.cassandra.db;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -28,6 +28,11 @@ import org.apache.cassandra.tracing.Tracing;
public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
{
+ protected IVersionedSerializer<ReadResponse> serializer()
+ {
+ return ReadResponse.serializer;
+ }
+
public void doVerb(MessageIn<ReadCommand> message, int id)
{
if (StorageService.instance.isBootstrapMode())
@@ -42,7 +47,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
response = command.createResponse(iterator);
}
- MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer);
+ MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());
Tracing.trace("Enqueuing response to {}", message.from);
MessagingService.instance().sendReply(reply, id, message.from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 90bd21d..b3cc725 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -30,6 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -57,8 +62,10 @@ public abstract class ReadResponse
return new DigestResponse(makeDigest(data));
}
- public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata);
- public abstract ByteBuffer digest(CFMetaData metadata);
+ public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
+
+ public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
+
public abstract boolean isDigestQuery();
protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator)
@@ -79,12 +86,12 @@ public abstract class ReadResponse
this.digest = digest;
}
- public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
+ public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
{
throw new UnsupportedOperationException();
}
- public ByteBuffer digest(CFMetaData metadata)
+ public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
{
return digest;
}
@@ -124,7 +131,7 @@ public abstract class ReadResponse
}
}
- public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
+ public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
{
try
{
@@ -138,9 +145,76 @@ public abstract class ReadResponse
}
}
- public ByteBuffer digest(CFMetaData metadata)
+ public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
{
- try (UnfilteredPartitionIterator iterator = makeIterator(metadata))
+ try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
+ {
+ return makeDigest(iterator);
+ }
+ }
+
+ public boolean isDigestQuery()
+ {
+ return false;
+ }
+ }
+
+ /**
+ * A remote response from a pre-3.0 node. This needs a separate class in order to cleanly handle trimming and
+ * reversal of results when the read command calls for it. Pre-3.0 nodes always return results in the normal
+ * sorted order, even if the query asks for reversed results. Additionally, pre-3.0 nodes do not have a notion of
+ * exclusive slices on non-composite tables, so extra rows may need to be trimmed.
+ */
+ private static class LegacyRemoteDataResponse extends ReadResponse
+ {
+ private final List<ArrayBackedPartition> partitions;
+
+ private LegacyRemoteDataResponse(List<ArrayBackedPartition> partitions)
+ {
+ super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata
+ this.partitions = partitions;
+ }
+
+ public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, final ReadCommand command)
+ {
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ private int idx;
+
+ public boolean isForThrift()
+ {
+ return true;
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public boolean hasNext()
+ {
+ return idx < partitions.size();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ ArrayBackedPartition partition = partitions.get(idx++);
+
+ ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey());
+
+ // Pre-3.0, we didn't have a way to express exclusivity for non-composite comparators, so all slices were
+ // inclusive on both ends. If we have exclusive slice ends, we need to filter the results here.
+ if (!command.metadata().isCompound())
+ return filter.filter(partition.sliceableUnfilteredIterator(command.columnFilter(), filter.isReversed()));
+
+ return partition.unfilteredIterator(command.columnFilter(), Slices.ALL, filter.isReversed());
+ }
+ };
+ }
+
+ public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+ {
+ try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
{
return makeDigest(iterator);
}
@@ -156,14 +230,32 @@ public abstract class ReadResponse
{
public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
{
+ boolean isDigest = response instanceof DigestResponse;
+ ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
if (version < MessagingService.VERSION_30)
{
- // TODO
- throw new UnsupportedOperationException();
+ out.writeInt(digest.remaining());
+ out.write(digest);
+ out.writeBoolean(isDigest);
+ if (!isDigest)
+ {
+ assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+ try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+ {
+ assert iter.hasNext();
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
+ LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+ }
+ assert !iter.hasNext();
+ }
+ }
+ return;
}
- boolean isDigest = response.isDigestQuery();
- ByteBufferUtil.writeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+ ByteBufferUtil.writeWithVIntLength(digest, out);
if (!isDigest)
{
// Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
@@ -178,8 +270,35 @@ public abstract class ReadResponse
{
if (version < MessagingService.VERSION_30)
{
- // TODO
- throw new UnsupportedOperationException();
+ byte[] digest = null;
+ int digestSize = in.readInt();
+ if (digestSize > 0)
+ {
+ digest = new byte[digestSize];
+ in.readFully(digest, 0, digestSize);
+ }
+ boolean isDigest = in.readBoolean();
+ assert isDigest == digestSize > 0;
+ if (isDigest)
+ {
+ assert digest != null;
+ return new DigestResponse(ByteBuffer.wrap(digest));
+ }
+
+ // ReadResponses from older versions are always single-partition (ranges are handled by RangeSliceReply)
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key);
+ if (rowIterator == null)
+ return new LegacyRemoteDataResponse(Collections.emptyList());
+
+ try
+ {
+ return new LegacyRemoteDataResponse(Collections.singletonList(ArrayBackedPartition.create(rowIterator)));
+ }
+ finally
+ {
+ rowIterator.close();
+ }
}
ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in);
@@ -193,14 +312,32 @@ public abstract class ReadResponse
public long serializedSize(ReadResponse response, int version)
{
+ boolean isDigest = response instanceof DigestResponse;
+ ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
if (version < MessagingService.VERSION_30)
{
- // TODO
- throw new UnsupportedOperationException();
+ long size = TypeSizes.sizeof(digest.remaining())
+ + digest.remaining()
+ + TypeSizes.sizeof(isDigest);
+ if (!isDigest)
+ {
+ assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+ try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+ {
+ assert iter.hasNext();
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
+ size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+ }
+ assert !iter.hasNext();
+ }
+ }
+ return size;
}
- boolean isDigest = response.isDigestQuery();
- long size = ByteBufferUtil.serializedSizeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
if (!isDigest)
{
// Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
@@ -217,32 +354,75 @@ public abstract class ReadResponse
{
public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
{
- // TODO
- throw new UnsupportedOperationException();
- // out.writeInt(rsr.rows.size());
- // for (Row row : rsr.rows)
- // Row.serializer.serialize(row, out, version);
+ assert version < MessagingService.VERSION_30;
+
+ // determine the number of partitions upfront for serialization
+ int numPartitions = 0;
+ assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+ try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+ {
+ while (iterator.hasNext())
+ {
+ try (UnfilteredRowIterator atomIterator = iterator.next())
+ {
+ numPartitions++;
+
+ // we have to fully exhaust the subiterator
+ while (atomIterator.hasNext())
+ atomIterator.next();
+ }
+ }
+ }
+
+ out.writeInt(numPartitions);
+
+ try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+ {
+ while (iterator.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iterator.next())
+ {
+ ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
+ LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+ }
+ }
+ }
}
public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
{
- // TODO
- throw new UnsupportedOperationException();
- // int rowCount = in.readInt();
- // List<Row> rows = new ArrayList<Row>(rowCount);
- // for (int i = 0; i < rowCount; i++)
- // rows.add(Row.serializer.deserialize(in, version));
- // return new RangeSliceReply(rows);
+ // Contrarily to serialize, we have to read the number of serialized partitions here.
+ int partitionCount = in.readInt();
+ ArrayList<ArrayBackedPartition> partitions = new ArrayList<>(partitionCount);
+ for (int i = 0; i < partitionCount; i++)
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
+ {
+ partitions.add(ArrayBackedPartition.create(partition));
+ }
+ }
+ return new LegacyRemoteDataResponse(partitions);
}
public long serializedSize(ReadResponse response, int version)
{
- // TODO
- throw new UnsupportedOperationException();
- // int size = TypeSizes.sizeof(rsr.rows.size());
- // for (Row row : rsr.rows)
- // size += Row.serializer.serializedSize(row, version);
- // return size;
+ assert version < MessagingService.VERSION_30;
+ long size = TypeSizes.sizeof(0); // number of partitions
+
+ assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+ try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+ {
+ while (iterator.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iterator.next())
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
+ size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+ }
+ }
+ }
+ return size;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index bb184e8..1b688c9 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.tracing.Tracing;
@@ -257,7 +259,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
{
assert !cfs.isIndex(); // CASSANDRA-5732
- assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [" + cfs.name + "]");
+ assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
UUID cfId = metadata().cfId;
RowCacheKey key = new RowCacheKey(cfId, partitionKey());
@@ -393,6 +395,11 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
nowInSec());
}
+ protected MessageOut<ReadCommand> createLegacyMessage()
+ {
+ return new MessageOut<>(MessagingService.Verb.READ, this, legacyReadCommandSerializer);
+ }
+
protected void appendCQLWhereClause(StringBuilder sb)
{
sb.append(" WHERE ");
@@ -509,5 +516,5 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
else
return new SinglePartitionSliceCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter);
}
- };
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 2ffb91e..7fde45e 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -19,15 +19,12 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.security.MessageDigest;
import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
/**
* A slice represents the selection of a range of rows.
@@ -83,11 +80,10 @@ public class Slice
public static Slice make(ClusteringComparator comparator, Object... values)
{
CBuilder builder = CBuilder.create(comparator);
- for (int i = 0; i < values.length; i++)
+ for (Object val : values)
{
- Object val = values[i];
if (val instanceof ByteBuffer)
- builder.add((ByteBuffer)val);
+ builder.add((ByteBuffer) val);
else
builder.add(val);
}
@@ -208,6 +204,9 @@ public class Slice
*/
public Slice forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
{
+ if (lastReturned == null)
+ return this;
+
if (reversed)
{
int cmp = comparator.compare(lastReturned, start);
@@ -286,14 +285,14 @@ public class Slice
for (int i = 0; i < start.size(); i++)
{
if (i > 0)
- sb.append(":");
+ sb.append(':');
sb.append(comparator.subtype(i).getString(start.get(i)));
}
sb.append(", ");
for (int i = 0; i < end.size(); i++)
{
if (i > 0)
- sb.append(":");
+ sb.append(':');
sb.append(comparator.subtype(i).getString(end.get(i)));
}
sb.append(end.isInclusive() ? "]" : ")");
@@ -394,14 +393,37 @@ public class Slice
return create(Kind.EXCL_END_BOUND, values);
}
+ public static Bound inclusiveStartOf(ClusteringPrefix prefix)
+ {
+ ByteBuffer[] values = new ByteBuffer[prefix.size()];
+ for (int i = 0; i < prefix.size(); i++)
+ values[i] = prefix.get(i);
+ return inclusiveStartOf(values);
+ }
+
+ public static Bound exclusiveStartOf(ClusteringPrefix prefix)
+ {
+ ByteBuffer[] values = new ByteBuffer[prefix.size()];
+ for (int i = 0; i < prefix.size(); i++)
+ values[i] = prefix.get(i);
+ return exclusiveStartOf(values);
+ }
+
+ public static Bound inclusiveEndOf(ClusteringPrefix prefix)
+ {
+ ByteBuffer[] values = new ByteBuffer[prefix.size()];
+ for (int i = 0; i < prefix.size(); i++)
+ values[i] = prefix.get(i);
+ return inclusiveEndOf(values);
+ }
+
public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values)
{
CBuilder builder = CBuilder.create(comparator);
- for (int i = 0; i < values.length; i++)
+ for (Object val : values)
{
- Object val = values[i];
if (val instanceof ByteBuffer)
- builder.add((ByteBuffer)val);
+ builder.add((ByteBuffer) val);
else
builder.add(val);
}
@@ -483,14 +505,14 @@ public class Slice
public String toString(ClusteringComparator comparator)
{
StringBuilder sb = new StringBuilder();
- sb.append(kind()).append("(");
+ sb.append(kind()).append('(');
for (int i = 0; i < size(); i++)
{
if (i > 0)
sb.append(", ");
sb.append(comparator.subtype(i).getString(get(i)));
}
- return sb.append(")").toString();
+ return sb.append(')').toString();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index ed7584b..51e9d8e 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -28,23 +28,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter
{
- protected enum Kind
- {
- SLICE (ClusteringIndexSliceFilter.deserializer),
- NAMES (ClusteringIndexNamesFilter.deserializer);
-
- private final InternalDeserializer deserializer;
-
- private Kind(InternalDeserializer deserializer)
- {
- this.deserializer = deserializer;
- }
- }
-
static final Serializer serializer = new FilterSerializer();
- abstract Kind kind();
-
protected final boolean reversed;
protected AbstractClusteringIndexFilter(boolean reversed)
@@ -101,9 +86,4 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
+ filter.serializedSizeInternal(version);
}
}
-
- protected static abstract class InternalDeserializer
- {
- public abstract ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
index 33a0917..e3f824f 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
@@ -39,6 +39,24 @@ public interface ClusteringIndexFilter
{
public static Serializer serializer = AbstractClusteringIndexFilter.serializer;
+ public enum Kind
+ {
+ SLICE (ClusteringIndexSliceFilter.deserializer),
+ NAMES (ClusteringIndexNamesFilter.deserializer);
+
+ protected final InternalDeserializer deserializer;
+
+ private Kind(InternalDeserializer deserializer)
+ {
+ this.deserializer = deserializer;
+ }
+ }
+
+ static interface InternalDeserializer
+ {
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
+ }
+
/**
* Whether the filter query rows in reversed clustering order or not.
*
@@ -140,6 +158,8 @@ public interface ClusteringIndexFilter
*/
public boolean shouldInclude(SSTableReader sstable);
+ public Kind kind();
+
public String toString(CFMetaData metadata);
public String toCQLString(CFMetaData metadata);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index f2c81a7..e0bc533 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -232,7 +232,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return sb.toString();
}
- Kind kind()
+ public Kind kind()
{
return Kind.NAMES;
}
@@ -254,7 +254,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return size;
}
- private static class NamesDeserializer extends InternalDeserializer
+ private static class NamesDeserializer implements InternalDeserializer
{
public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index 4f0e4e2..b2d529c 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -153,7 +153,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
return sb.toString();
}
- Kind kind()
+ public Kind kind()
{
return Kind.SLICE;
}
@@ -168,7 +168,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
return Slices.serializer.serializedSize(slices, version);
}
- private static class SliceDeserializer extends InternalDeserializer
+ private static class SliceDeserializer implements InternalDeserializer
{
public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index d2cb87d..2afc785 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -313,6 +313,9 @@ public class ColumnFilter
return "";
Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+ if (!defs.hasNext())
+ return "<none>";
+
StringBuilder sb = new StringBuilder();
appendColumnDef(sb, defs.next());
while (defs.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 458ee30..3e608b4 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -62,7 +62,7 @@ public abstract class DataLimits
// partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true);
- private enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
+ public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
public static DataLimits cqlLimits(int cqlRowLimit)
{
@@ -89,7 +89,7 @@ public abstract class DataLimits
return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
}
- protected abstract Kind kind();
+ public abstract Kind kind();
public abstract boolean isUnlimited();
@@ -199,7 +199,7 @@ public abstract class DataLimits
return new CQLLimits(rowLimit, 1, true);
}
- protected Kind kind()
+ public Kind kind()
{
return Kind.CQL_LIMIT;
}
@@ -368,7 +368,7 @@ public abstract class DataLimits
}
@Override
- protected Kind kind()
+ public Kind kind()
{
return Kind.CQL_PAGING_LIMIT;
}
@@ -432,7 +432,7 @@ public abstract class DataLimits
this.cellPerPartitionLimit = cellPerPartitionLimit;
}
- protected Kind kind()
+ public Kind kind()
{
return Kind.THRIFT_LIMIT;
}
@@ -588,7 +588,7 @@ public abstract class DataLimits
super(partitionLimit, cellPerPartitionLimit);
}
- protected Kind kind()
+ public Kind kind()
{
return Kind.SUPER_COLUMN_COUNTING_LIMIT;
}