You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:50:03 UTC
[10/11] cassandra git commit: Remove pre-3.0 compatibility code for
4.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
deleted file mode 100644
index 55826f5..0000000
--- a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-
-public class RangeSliceVerbHandler extends ReadCommandVerbHandler
-{
- @Override
- protected IVersionedSerializer<ReadResponse> serializer()
- {
- return ReadResponse.rangeSliceSerializer;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index d8051fe..0bda184 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
-import org.apache.cassandra.io.ForwardingVersionedSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -64,43 +63,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
- // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
- // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
- public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>()
- {
- protected IVersionedSerializer<ReadCommand> delegate(int version)
- {
- return version < MessagingService.VERSION_30
- ? legacyReadCommandSerializer : serializer;
- }
- };
-
- // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
- // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
- public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>()
- {
- protected IVersionedSerializer<ReadCommand> delegate(int version)
- {
- return version < MessagingService.VERSION_30
- ? legacyRangeSliceCommandSerializer : serializer;
- }
- };
-
- // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version.
- // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
- public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>()
- {
- protected IVersionedSerializer<ReadCommand> delegate(int version)
- {
- return version < MessagingService.VERSION_30
- ? legacyPagedRangeCommandSerializer : serializer;
- }
- };
-
- public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
- public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
- public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
-
private final Kind kind;
private final CFMetaData metadata;
private final int nowInSec;
@@ -580,7 +542,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
/**
* Creates a message for this command.
*/
- public abstract MessageOut<ReadCommand> createMessage(int version);
+ public abstract MessageOut<ReadCommand> createMessage();
protected abstract void appendCQLWhereClause(StringBuilder sb);
@@ -666,8 +628,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
- assert version >= MessagingService.VERSION_30;
-
out.writeByte(command.kind.ordinal());
out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
if (command.isDigestQuery())
@@ -685,8 +645,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
{
- assert version >= MessagingService.VERSION_30;
-
Kind kind = Kind.values()[in.readByte()];
int flags = in.readByte();
boolean isDigest = isDigest(flags);
@@ -699,8 +657,8 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator);
Optional<IndexMetadata> index = hasIndex
- ? deserializeIndexMetadata(in, version, metadata)
- : Optional.empty();
+ ? deserializeIndexMetadata(in, version, metadata)
+ : Optional.empty();
return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
}
@@ -724,8 +682,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
public long serializedSize(ReadCommand command, int version)
{
- assert version >= MessagingService.VERSION_30;
-
return 2 // kind + flags
+ (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
+ CFMetaData.serializer.serializedSize(command.metadata(), version)
@@ -737,1015 +693,4 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
+ command.indexSerializedSize(version);
}
}
-
- private enum LegacyType
- {
- GET_BY_NAMES((byte)1),
- GET_SLICES((byte)2);
-
- public final byte serializedValue;
-
- LegacyType(byte b)
- {
- this.serializedValue = b;
- }
-
- public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
- {
- return kind == ClusteringIndexFilter.Kind.SLICE
- ? GET_SLICES
- : GET_BY_NAMES;
- }
-
- public static LegacyType fromSerializedValue(byte b)
- {
- return b == 1 ? GET_BY_NAMES : GET_SLICES;
- }
- }
-
- /**
- * Serializer for pre-3.0 RangeSliceCommands.
- */
- private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
- {
- public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
-
- PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
- assert !rangeCommand.dataRange().isPaging();
-
- // convert pre-3.0 incompatible names filters to slice filters
- rangeCommand = maybeConvertNamesToSlice(rangeCommand);
-
- CFMetaData metadata = rangeCommand.metadata();
-
- out.writeUTF(metadata.ksName);
- out.writeUTF(metadata.cfName);
- out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
-
- // begin DiskAtomFilterSerializer.serialize()
- if (rangeCommand.isNamesQuery())
- {
- out.writeByte(1); // 0 for slices, 1 for names
- ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
- LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
- }
- else
- {
- out.writeByte(0); // 0 for slices, 1 for names
-
- // slice filter serialization
- ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
-
- boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
-
- out.writeBoolean(filter.isReversed());
-
- // limit
- DataLimits limits = rangeCommand.limits();
- if (limits.isDistinct())
- out.writeInt(1);
- else
- out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
-
- int compositesToGroup;
- boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
- compositesToGroup = -1;
- else if (limits.isDistinct() && !selectsStatics)
- compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
- else
- compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
-
- out.writeInt(compositesToGroup);
- }
-
- serializeRowFilter(out, rangeCommand.rowFilter());
- AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
-
- // maxResults
- out.writeInt(rangeCommand.limits().count());
-
- // countCQL3Rows
- if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT
- out.writeBoolean(false);
- else
- out.writeBoolean(true);
-
- // isPaging
- out.writeBoolean(false);
- }
-
- public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
-
- String keyspace = in.readUTF();
- String columnFamily = in.readUTF();
-
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
- if (metadata == null)
- {
- String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
- throw new UnknownColumnFamilyException(message, null);
- }
-
- int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
-
- ClusteringIndexFilter filter;
- ColumnFilter selection;
- int compositesToGroup = 0;
- int perPartitionLimit = -1;
- byte readType = in.readByte(); // 0 for slices, 1 for names
- if (readType == 1)
- {
- Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
- selection = selectionAndFilter.left;
- filter = selectionAndFilter.right;
- }
- else
- {
- Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
- filter = p.left;
- perPartitionLimit = in.readInt();
- compositesToGroup = in.readInt();
- selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
- }
-
- RowFilter rowFilter = deserializeRowFilter(in, metadata);
-
- AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
- int maxResults = in.readInt();
-
- boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed)
- in.readBoolean(); // isPaging (not needed)
-
- boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
- // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
- // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
- // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
- // that fact.
- boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
- DataLimits limits;
- if (isDistinct)
- limits = DataLimits.distinctLimits(maxResults);
- else if (compositesToGroup == -1)
- limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
- else
- limits = DataLimits.cqlLimits(maxResults);
-
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
- }
-
- static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
- {
- ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
- out.writeInt(indexExpressions.size());
- for (RowFilter.Expression expression : indexExpressions)
- {
- ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
- expression.operator().writeTo(out);
- ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
- }
- }
-
- static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
- {
- int numRowFilters = in.readInt();
- if (numRowFilters == 0)
- return RowFilter.NONE;
-
- RowFilter rowFilter = RowFilter.create(numRowFilters);
- for (int i = 0; i < numRowFilters; i++)
- {
- ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
- ColumnDefinition column = metadata.getColumnDefinition(columnName);
- Operator op = Operator.readFrom(in);
- ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
- rowFilter.add(column, op, indexValue);
- }
- return rowFilter;
- }
-
- static long serializedRowFilterSize(RowFilter rowFilter)
- {
- long size = TypeSizes.sizeof(0); // rowFilterCount
- for (RowFilter.Expression expression : rowFilter)
- {
- size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
- size += TypeSizes.sizeof(0); // operator int value
- size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
- }
- return size;
- }
-
- public long serializedSize(ReadCommand command, int version)
- {
- assert version < MessagingService.VERSION_30;
- assert command.kind == Kind.PARTITION_RANGE;
-
- PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
- rangeCommand = maybeConvertNamesToSlice(rangeCommand);
- CFMetaData metadata = rangeCommand.metadata();
-
- long size = TypeSizes.sizeof(metadata.ksName);
- size += TypeSizes.sizeof(metadata.cfName);
- size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
-
- size += 1; // single byte flag: 0 for slices, 1 for names
- if (rangeCommand.isNamesQuery())
- {
- PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
- ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
- size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
- }
- else
- {
- ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
- boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
- size += TypeSizes.sizeof(filter.isReversed());
- size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
- size += TypeSizes.sizeof(0); // compositesToGroup
- }
-
- if (rangeCommand.rowFilter().equals(RowFilter.NONE))
- {
- size += TypeSizes.sizeof(0);
- }
- else
- {
- ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
- size += TypeSizes.sizeof(indexExpressions.size());
- for (RowFilter.Expression expression : indexExpressions)
- {
- size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
- size += TypeSizes.sizeof(expression.operator().ordinal());
- size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
- }
- }
-
- size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
- size += TypeSizes.sizeof(rangeCommand.limits().count());
- size += TypeSizes.sizeof(!rangeCommand.isForThrift());
- return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
- }
-
- static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
- {
- if (!command.dataRange().isNamesQuery())
- return command;
-
- CFMetaData metadata = command.metadata();
- if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
- return command;
-
- ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
- ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
- DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
- return new PartitionRangeReadCommand(
- command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
- command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
- }
-
- static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
- {
- // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
- // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
- // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
- if (compositesToGroup == -2)
- return ColumnFilter.all(metadata);
-
- // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
- PartitionColumns columns = selectsStatics
- ? metadata.partitionColumns()
- : metadata.partitionColumns().withoutStatics();
- return ColumnFilter.selectionBuilder().addAll(columns).build();
- }
- }
-
- /**
- * Serializer for pre-3.0 PagedRangeCommands.
- */
- private static class LegacyPagedRangeCommandSerializer implements IVersionedSerializer<ReadCommand>
- {
- public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
-
- PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
- assert rangeCommand.dataRange().isPaging();
-
- CFMetaData metadata = rangeCommand.metadata();
-
- out.writeUTF(metadata.ksName);
- out.writeUTF(metadata.cfName);
- out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
-
- AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
-
- // pre-3.0 nodes don't accept names filters for paged range commands
- ClusteringIndexSliceFilter filter;
- if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES)
- filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata);
- else
- filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
-
- // slice filter
- boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
- out.writeBoolean(filter.isReversed());
-
- // slice filter's count
- DataLimits.Kind kind = rangeCommand.limits().kind();
- boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
- if (isDistinct)
- out.writeInt(1);
- else
- out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().perPartitionCount(), filter.requestedSlices()));
-
- // compositesToGroup
- boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- int compositesToGroup;
- if (kind == DataLimits.Kind.THRIFT_LIMIT)
- compositesToGroup = -1;
- else if (isDistinct && !selectsStatics)
- compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
- else
- compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
-
- out.writeInt(compositesToGroup);
-
- // command-level "start" and "stop" composites. The start is the last-returned cell name if there is one,
- // otherwise it's the same as the slice filter's start. The stop appears to always be the same as the
- // slice filter's stop.
- DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange();
- Clustering lastReturned = pagingRange.getLastReturned();
- ClusteringBound newStart = ClusteringBound.inclusiveStartOf(lastReturned);
- Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1);
- ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeBound(metadata, newStart, true), out);
- ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()), out);
-
- LegacyRangeSliceCommandSerializer.serializeRowFilter(out, rangeCommand.rowFilter());
-
- // command-level limit
- // Pre-3.0 we would always request one more row than we actually needed and the command-level "start" would
- // be the last-returned cell name, so the response would always include it.
- int maxResults = rangeCommand.limits().count() + 1;
- out.writeInt(maxResults);
-
- // countCQL3Rows
- if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // for Thrift or DISTINCT
- out.writeBoolean(false);
- else
- out.writeBoolean(true);
- }
-
- public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
-
- String keyspace = in.readUTF();
- String columnFamily = in.readUTF();
-
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
- if (metadata == null)
- {
- String message = String.format("Got legacy paged range command for nonexistent table %s.%s.", keyspace, columnFamily);
- throw new UnknownColumnFamilyException(message, null);
- }
-
- int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
- AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
-
- Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
- ClusteringIndexSliceFilter filter = p.left;
- boolean selectsStatics = p.right;
-
- int perPartitionLimit = in.readInt();
- int compositesToGroup = in.readInt();
-
- // command-level Composite "start" and "stop"
- LegacyLayout.LegacyBound startBound = LegacyLayout.decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
-
- ByteBufferUtil.readWithShortLength(in); // the composite "stop", which isn't actually needed
-
- ColumnFilter selection = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata);
-
- RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
- int maxResults = in.readInt();
- boolean countCQL3Rows = in.readBoolean();
-
- // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
- // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
- // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
- // that fact.
- boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
- DataLimits limits;
- if (isDistinct)
- limits = DataLimits.distinctLimits(maxResults);
- else
- limits = DataLimits.cqlLimits(maxResults);
-
- limits = limits.forPaging(maxResults);
-
- // The pagedRangeCommand is used in pre-3.0 for both the first page and the following ones. On the first page, the startBound will be
- // the start of the overall slice and will not be a proper Clustering. So detect that case and just return a non-paging DataRange, which
- // is what 3.0 does.
- DataRange dataRange = new DataRange(keyRange, filter);
- Slices slices = filter.requestedSlices();
- if (!isDistinct && startBound != LegacyLayout.LegacyBound.BOTTOM && !startBound.bound.equals(slices.get(0).start()))
- {
- // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it
- // missing without any problems, so we can safely always set "inclusive" to false in the data range
- dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
- }
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
- }
-
- public long serializedSize(ReadCommand command, int version)
- {
- assert version < MessagingService.VERSION_30;
- assert command.kind == Kind.PARTITION_RANGE;
-
- PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
- CFMetaData metadata = rangeCommand.metadata();
- assert rangeCommand.dataRange().isPaging();
-
- long size = TypeSizes.sizeof(metadata.ksName);
- size += TypeSizes.sizeof(metadata.cfName);
- size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
-
- size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
-
- // pre-3.0 nodes only accept slice filters for paged range commands
- ClusteringIndexSliceFilter filter;
- if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES)
- filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata);
- else
- filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
-
- // slice filter
- boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
- size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
- size += TypeSizes.sizeof(filter.isReversed());
-
- // slice filter's count
- size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
-
- // compositesToGroup
- size += TypeSizes.sizeof(0);
-
- // command-level Composite "start" and "stop"
- DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange();
- Clustering lastReturned = pagingRange.getLastReturned();
- Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1);
- size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastReturned));
- size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()));
-
- size += LegacyRangeSliceCommandSerializer.serializedRowFilterSize(rangeCommand.rowFilter());
-
- // command-level limit
- size += TypeSizes.sizeof(rangeCommand.limits().count());
-
- // countCQL3Rows
- return size + TypeSizes.sizeof(true);
- }
- }
-
- /**
- * Serializer for pre-3.0 ReadCommands.
- */
- static class LegacyReadCommandSerializer implements IVersionedSerializer<ReadCommand>
- {
- public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
- assert command.kind == Kind.SINGLE_PARTITION;
-
- SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command;
- singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
-
- CFMetaData metadata = singleReadCommand.metadata();
-
- out.writeByte(LegacyType.fromPartitionFilterKind(singleReadCommand.clusteringIndexFilter().kind()).serializedValue);
-
- out.writeBoolean(singleReadCommand.isDigestQuery());
- out.writeUTF(metadata.ksName);
- ByteBufferUtil.writeWithShortLength(singleReadCommand.partitionKey().getKey(), out);
- out.writeUTF(metadata.cfName);
- out.writeLong(singleReadCommand.nowInSec() * 1000L); // convert from seconds to millis
-
- if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
- serializeSliceCommand(singleReadCommand, out);
- else
- serializeNamesCommand(singleReadCommand, out);
- }
-
- public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
- LegacyType msgType = LegacyType.fromSerializedValue(in.readByte());
-
- boolean isDigest = in.readBoolean();
- String keyspaceName = in.readUTF();
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- String cfName = in.readUTF();
- long nowInMillis = in.readLong();
- int nowInSeconds = (int) (nowInMillis / 1000); // convert from millis to seconds
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
- DecoratedKey dk = metadata.partitioner.decorateKey(key);
-
- switch (msgType)
- {
- case GET_BY_NAMES:
- return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds, version);
- case GET_SLICES:
- return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds, version);
- default:
- throw new AssertionError();
- }
- }
-
- public long serializedSize(ReadCommand command, int version)
- {
- assert version < MessagingService.VERSION_30;
- assert command.kind == Kind.SINGLE_PARTITION;
- SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command;
- singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
-
- int keySize = singleReadCommand.partitionKey().getKey().remaining();
-
- CFMetaData metadata = singleReadCommand.metadata();
-
- long size = 1; // message type (single byte)
- size += TypeSizes.sizeof(command.isDigestQuery());
- size += TypeSizes.sizeof(metadata.ksName);
- size += TypeSizes.sizeof((short) keySize) + keySize;
- size += TypeSizes.sizeof((long) command.nowInSec());
-
- if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
- return size + serializedSliceCommandSize(singleReadCommand);
- else
- return size + serializedNamesCommandSize(singleReadCommand);
- }
-
- private void serializeNamesCommand(SinglePartitionReadCommand command, DataOutputPlus out) throws IOException
- {
- serializeNamesFilter(command, (ClusteringIndexNamesFilter)command.clusteringIndexFilter(), out);
- }
-
- private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException
- {
- PartitionColumns columns = command.columnFilter().fetchedColumns();
- CFMetaData metadata = command.metadata();
- SortedSet<Clustering> requestedRows = filter.requestedRows();
-
- if (requestedRows.isEmpty())
- {
- // only static columns are requested
- out.writeInt(columns.size());
- for (ColumnDefinition column : columns)
- ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
- }
- else
- {
- out.writeInt(requestedRows.size() * columns.size());
- for (Clustering clustering : requestedRows)
- {
- for (ColumnDefinition column : columns)
- ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null), out);
- }
- }
-
- // countCql3Rows should be true if it's not for Thrift or a DISTINCT query
- if (command.isForThrift() || (command.limits().kind() == DataLimits.Kind.CQL_LIMIT && command.limits().perPartitionCount() == 1))
- out.writeBoolean(false); // it's compact and not a DISTINCT query
- else
- out.writeBoolean(true);
- }
-
- static long serializedNamesFilterSize(ClusteringIndexNamesFilter filter, CFMetaData metadata, PartitionColumns fetchedColumns)
- {
- SortedSet<Clustering> requestedRows = filter.requestedRows();
-
- long size = 0;
- if (requestedRows.isEmpty())
- {
- // only static columns are requested
- size += TypeSizes.sizeof(fetchedColumns.size());
- for (ColumnDefinition column : fetchedColumns)
- size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
- }
- else
- {
- size += TypeSizes.sizeof(requestedRows.size() * fetchedColumns.size());
- for (Clustering clustering : requestedRows)
- {
- for (ColumnDefinition column : fetchedColumns)
- size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null));
- }
- }
-
- return size + TypeSizes.sizeof(true); // countCql3Rows
- }
-
- private SinglePartitionReadCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
- {
- Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
-
- // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
- return new SinglePartitionReadCommand(
- isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
- key, selectionAndFilter.right);
- }
-
- static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException
- {
- int numCellNames = in.readInt();
-
- // The names filter could include either a) static columns or b) normal columns with the clustering columns
- // fully specified. We need to handle those cases differently in 3.0.
- NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator);
-
- ColumnFilter.Builder selectionBuilder = ColumnFilter.selectionBuilder();
- for (int i = 0; i < numCellNames; i++)
- {
- ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in);
- LegacyLayout.LegacyCellName cellName;
- try
- {
- cellName = LegacyLayout.decodeCellName(metadata, buffer);
- }
- catch (UnknownColumnException exc)
- {
- // TODO this probably needs a new exception class that shares a parent with UnknownColumnFamilyException
- throw new UnknownColumnFamilyException(
- "Received legacy range read command with names filter for unrecognized column name. " +
- "Fill name in filter (hex): " + ByteBufferUtil.bytesToHex(buffer), metadata.cfId);
- }
-
- // If we're querying for a static column, we may also need to read it
- // as if it were a thrift dynamic column (because the column metadata,
- // which makes it a static column in 3.0+, may have been added *after*
- // some values were written). Note that all cql queries on non-compact
- // tables used slice & not name filters prior to 3.0 so this path is
- // not taken for non-compact tables. It is theoretically possible to
- // get here via thrift, hence the check on metadata.isStaticCompactTable.
- // See CASSANDRA-11087.
- if (metadata.isStaticCompactTable() && cellName.clustering.equals(Clustering.STATIC_CLUSTERING))
- {
- clusterings.add(Clustering.make(cellName.column.name.bytes));
- selectionBuilder.add(metadata.compactValueColumn());
- }
- else
- {
- clusterings.add(cellName.clustering);
- }
-
- selectionBuilder.add(cellName.column);
- }
-
- // for compact storage tables without clustering keys, the column holding the selected value is named
- // 'value' internally we add it to the selection here to prevent errors due to unexpected column names
- // when serializing the initial local data response
- if (metadata.isStaticCompactTable() && clusterings.isEmpty())
- selectionBuilder.addAll(metadata.partitionColumns());
-
- in.readBoolean(); // countCql3Rows
-
- // clusterings cannot include STATIC_CLUSTERING, so if the names filter is for static columns, clusterings
- // will be empty. However, by requesting the static columns in our ColumnFilter, this will still work.
- ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false);
- return Pair.create(selectionBuilder.build(), filter);
- }
-
- private long serializedNamesCommandSize(SinglePartitionReadCommand command)
- {
- ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter();
- PartitionColumns columns = command.columnFilter().fetchedColumns();
- return serializedNamesFilterSize(filter, command.metadata(), columns);
- }
-
- private void serializeSliceCommand(SinglePartitionReadCommand command, DataOutputPlus out) throws IOException
- {
- CFMetaData metadata = command.metadata();
- ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter)command.clusteringIndexFilter();
-
- Slices slices = filter.requestedSlices();
- boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
- serializeSlices(out, slices, filter.isReversed(), makeStaticSlice, metadata);
-
- out.writeBoolean(filter.isReversed());
-
- boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING);
- DataLimits limits = command.limits();
- if (limits.isDistinct())
- out.writeInt(1); // the limit is always 1 for DISTINCT queries
- else
- out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices()));
-
- int compositesToGroup;
- if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
- compositesToGroup = -1;
- else if (limits.isDistinct() && !selectsStatics)
- compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
- else
- compositesToGroup = metadata.clusteringColumns().size();
-
- out.writeInt(compositesToGroup);
- }
-
- private SinglePartitionReadCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
- {
- Pair<ClusteringIndexSliceFilter, Boolean> p = deserializeSlicePartitionFilter(in, metadata);
- ClusteringIndexSliceFilter filter = p.left;
- boolean selectsStatics = p.right;
- int count = in.readInt();
- int compositesToGroup = in.readInt();
-
- // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
- ColumnFilter columnFilter = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata);
-
- // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
- // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter is probablematic
- // however as we have no way to distinguish it from a normal select with a limit of 1 (and this, contrarily to the range query case
- // were the countCQL3Rows boolean allows us to decide).
- // So we consider this case not distinct here. This is ok because even if it is a distinct (with static), the count will be 1 and
- // we'll still just query one row (a distinct DataLimits currently behave exactly like a CQL limit with a count of 1). The only
- // drawback is that we'll send back the first row entirely while a 2.1/2.2 node would return only the first cell in that same
- // situation. This isn't a problem for 2.1/2.2 code however (it would be for a range query, as it would throw off the count for
- // reasons similar to CASSANDRA-10762, but it's ok for single partition queries).
- // We do _not_ want to do the reverse however and consider a 'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make
- // us only return the 1st cell rather then 1st row.
- DataLimits limits;
- if (compositesToGroup == -2)
- limits = DataLimits.distinctLimits(count); // See CASSANDRA-8490 for the explanation of this value
- else if (compositesToGroup == -1)
- limits = DataLimits.thriftLimits(1, count);
- else
- limits = DataLimits.cqlLimits(count);
-
- // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
- return new SinglePartitionReadCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
- }
-
- private long serializedSliceCommandSize(SinglePartitionReadCommand command)
- {
- CFMetaData metadata = command.metadata();
- ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter)command.clusteringIndexFilter();
-
- Slices slices = filter.requestedSlices();
- boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
-
- long size = serializedSlicesSize(slices, makeStaticSlice, metadata);
- size += TypeSizes.sizeof(command.clusteringIndexFilter().isReversed());
- size += TypeSizes.sizeof(command.limits().count());
- return size + TypeSizes.sizeof(0); // compositesToGroup
- }
-
- static void serializeSlices(DataOutputPlus out, Slices slices, boolean isReversed, boolean makeStaticSlice, CFMetaData metadata) throws IOException
- {
- out.writeInt(slices.size() + (makeStaticSlice ? 1 : 0));
-
- // In 3.0 we always store the slices in normal comparator order. Pre-3.0 nodes expect the slices to
- // be in reversed order if the query is reversed, so we handle that here.
- if (isReversed)
- {
- for (int i = slices.size() - 1; i >= 0; i--)
- serializeSlice(out, slices.get(i), true, metadata);
- if (makeStaticSlice)
- serializeStaticSlice(out, true, metadata);
- }
- else
- {
- if (makeStaticSlice)
- serializeStaticSlice(out, false, metadata);
- for (Slice slice : slices)
- serializeSlice(out, slice, false, metadata);
- }
- }
-
- static long serializedSlicesSize(Slices slices, boolean makeStaticSlice, CFMetaData metadata)
- {
- long size = TypeSizes.sizeof(slices.size());
-
- for (Slice slice : slices)
- {
- ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, slice.start(), true);
- size += ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
- ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, slice.end(), false);
- size += ByteBufferUtil.serializedSizeWithShortLength(sliceEnd);
- }
-
- if (makeStaticSlice)
- size += serializedStaticSliceSize(metadata);
-
- return size;
- }
-
- static long serializedStaticSliceSize(CFMetaData metadata)
- {
- // unlike serializeStaticSlice(), but we don't care about reversal for size calculations
- ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, ClusteringBound.BOTTOM, false);
- long size = ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
-
- size += TypeSizes.sizeof((short) (metadata.comparator.size() * 3 + 2));
- size += TypeSizes.sizeof((short) LegacyLayout.STATIC_PREFIX);
- for (int i = 0; i < metadata.comparator.size(); i++)
- {
- size += ByteBufferUtil.serializedSizeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER);
- size += 1; // EOC
- }
- return size;
- }
-
- private static void serializeSlice(DataOutputPlus out, Slice slice, boolean isReversed, CFMetaData metadata) throws IOException
- {
- ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, isReversed ? slice.end() : slice.start(), !isReversed);
- ByteBufferUtil.writeWithShortLength(sliceStart, out);
-
- ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, isReversed ? slice.start() : slice.end(), isReversed);
- ByteBufferUtil.writeWithShortLength(sliceEnd, out);
- }
-
- private static void serializeStaticSlice(DataOutputPlus out, boolean isReversed, CFMetaData metadata) throws IOException
- {
- // if reversed, write an empty bound for the slice start; if reversed, write out an empty bound for the
- // slice finish after we've written the static slice start
- if (!isReversed)
- {
- ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, ClusteringBound.BOTTOM, false);
- ByteBufferUtil.writeWithShortLength(sliceStart, out);
- }
-
- // write out the length of the composite
- out.writeShort(2 + metadata.comparator.size() * 3); // two bytes + EOC for each component, plus static prefix
- out.writeShort(LegacyLayout.STATIC_PREFIX);
- for (int i = 0; i < metadata.comparator.size(); i++)
- {
- ByteBufferUtil.writeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
- // write the EOC, using an inclusive end if we're on the final component
- out.writeByte(i == metadata.comparator.size() - 1 ? 1 : 0);
- }
-
- if (isReversed)
- {
- ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, ClusteringBound.BOTTOM, false);
- ByteBufferUtil.writeWithShortLength(sliceStart, out);
- }
- }
-
- // Returns the deserialized filter, and whether static columns are queried (in pre-3.0, both info are determined by the slices,
- // but in 3.0 they are separated: whether static columns are queried or not depends on the ColumnFilter).
- static Pair<ClusteringIndexSliceFilter, Boolean> deserializeSlicePartitionFilter(DataInputPlus in, CFMetaData metadata) throws IOException
- {
- int numSlices = in.readInt();
- ByteBuffer[] startBuffers = new ByteBuffer[numSlices];
- ByteBuffer[] finishBuffers = new ByteBuffer[numSlices];
- for (int i = 0; i < numSlices; i++)
- {
- startBuffers[i] = ByteBufferUtil.readWithShortLength(in);
- finishBuffers[i] = ByteBufferUtil.readWithShortLength(in);
- }
-
- boolean reversed = in.readBoolean();
-
- if (reversed)
- {
- // pre-3.0, reversed query slices put the greater element at the start of the slice
- ByteBuffer[] tmp = finishBuffers;
- finishBuffers = startBuffers;
- startBuffers = tmp;
- }
-
- boolean selectsStatics = false;
- Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator);
- for (int i = 0; i < numSlices; i++)
- {
- LegacyLayout.LegacyBound start = LegacyLayout.decodeBound(metadata, startBuffers[i], true);
- LegacyLayout.LegacyBound finish = LegacyLayout.decodeBound(metadata, finishBuffers[i], false);
-
- if (start.isStatic)
- {
- // If we start at the static block, this means we start at the beginning of the partition in 3.0
- // terms (since 3.0 handles static outside of the slice).
- start = LegacyLayout.LegacyBound.BOTTOM;
-
- // Then if we include the static, records it
- if (start.bound.isInclusive())
- selectsStatics = true;
- }
- else if (start == LegacyLayout.LegacyBound.BOTTOM)
- {
- selectsStatics = true;
- }
-
- // If the end of the slice is the end of the statics, then that mean this slice was just selecting static
- // columns. We have already recorded that in selectsStatics, so we can ignore the slice (which doesn't make
- // sense for 3.0).
- if (finish.isStatic)
- {
- assert finish.bound.isInclusive(); // it would make no sense for a pre-3.0 node to have a slice that stops
- // before the static columns (since there is nothing before that)
- continue;
- }
-
- slicesBuilder.add(Slice.make(start.bound, finish.bound));
- }
-
- return Pair.create(new ClusteringIndexSliceFilter(slicesBuilder.build(), reversed), selectsStatics);
- }
-
- private static SinglePartitionReadCommand maybeConvertNamesToSlice(SinglePartitionReadCommand command)
- {
- if (command.clusteringIndexFilter().kind() != ClusteringIndexFilter.Kind.NAMES)
- return command;
-
- CFMetaData metadata = command.metadata();
-
- if (!shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
- return command;
-
- ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter();
- ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
- return new SinglePartitionReadCommand(
- command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
- command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
- }
-
- /**
- * Returns true if a names filter on the given table and column selection should be converted to a slice
- * filter for compatibility with pre-3.0 nodes, false otherwise.
- */
- static boolean shouldConvertNamesToSlice(CFMetaData metadata, PartitionColumns columns)
- {
- // On pre-3.0 nodes, due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
- if (!metadata.isDense() && metadata.isCompound())
- return true;
-
- // pre-3.0 nodes don't support names filters for reading collections, so if we're requesting any of those,
- // we need to convert this to a slice filter
- for (ColumnDefinition column : columns)
- {
- if (column.type.isMultiCell())
- return true;
- }
- return false;
- }
-
- /**
- * Converts a names filter that is incompatible with pre-3.0 nodes to a slice filter that is compatible.
- */
- private static ClusteringIndexSliceFilter convertNamesFilterToSliceFilter(ClusteringIndexNamesFilter filter, CFMetaData metadata)
- {
- SortedSet<Clustering> requestedRows = filter.requestedRows();
- Slices slices;
- if (requestedRows.isEmpty())
- {
- slices = Slices.NONE;
- }
- else if (requestedRows.size() == 1 && requestedRows.first().size() == 0)
- {
- slices = Slices.ALL;
- }
- else
- {
- Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator);
- for (Clustering clustering : requestedRows)
- slicesBuilder.add(ClusteringBound.inclusiveStartOf(clustering), ClusteringBound.inclusiveEndOf(clustering));
- slices = slicesBuilder.build();
- }
-
- return new ClusteringIndexSliceFilter(slices, filter.isReversed());
- }
-
- /**
- * Potentially increases the existing query limit to account for the lack of exclusive bounds in pre-3.0 nodes.
- * @param limit the existing query limit
- * @param slices the requested slices
- * @return the updated limit
- */
- static int updateLimitForQuery(int limit, Slices slices)
- {
- // Pre-3.0 nodes don't support exclusive bounds for slices. Instead, we query one more element if necessary
- // and filter it later (in LegacyRemoteDataResponse)
- if (!slices.hasLowerBound() && ! slices.hasUpperBound())
- return limit;
-
- for (Slice slice : slices)
- {
- if (limit == Integer.MAX_VALUE)
- return limit;
-
- if (!slice.start().isInclusive())
- limit++;
- if (!slice.end().isInclusive())
- limit++;
- }
- return limit;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index cca21f8..c3eae0d 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.ForwardingVersionedSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -47,20 +46,6 @@ public abstract class ReadResponse
{
// Serializer for single partition read response
public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
- // Serializer for the pre-3.0 rang slice responses.
- public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
- // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to
- // 'legacyRangeSliceReplySerializer' in older version.
- public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadResponse>()
- {
- @Override
- protected IVersionedSerializer<ReadResponse> delegate(int version)
- {
- return version < MessagingService.VERSION_30
- ? legacyRangeSliceReplySerializer
- : serializer;
- }
- };
// This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
// hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough".
@@ -95,7 +80,7 @@ public abstract class ReadResponse
protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
{
MessageDigest digest = FBUtilities.threadLocalMD5Digest();
- UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion());
+ UnfilteredPartitionIterators.digest(iterator, digest, command.digestVersion());
return ByteBuffer.wrap(digest.digest());
}
@@ -210,130 +195,12 @@ public abstract class ReadResponse
}
}
- /**
- * A remote response from a pre-3.0 node. This needs a separate class in order to cleanly handle trimming and
- * reversal of results when the read command calls for it. Pre-3.0 nodes always return results in the normal
- * sorted order, even if the query asks for reversed results. Additionally, pre-3.0 nodes do not have a notion of
- * exclusive slices on non-composite tables, so extra rows may need to be trimmed.
- */
- @VisibleForTesting
- static class LegacyRemoteDataResponse extends ReadResponse
- {
- private final List<ImmutableBTreePartition> partitions;
-
- @VisibleForTesting
- LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions)
- {
- super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the command
- this.partitions = partitions;
- }
-
- public UnfilteredPartitionIterator makeIterator(final ReadCommand command)
- {
- // Due to a bug in the serialization of AbstractBounds, anything that isn't a Range is understood by pre-3.0 nodes
- // as a Bound, which means IncludingExcludingBounds and ExcludingBounds responses may include keys they shouldn't.
- // So filter partitions that shouldn't be included here.
- boolean skipFirst = false;
- boolean skipLast = false;
- if (!partitions.isEmpty() && command instanceof PartitionRangeReadCommand)
- {
- AbstractBounds<PartitionPosition> keyRange = ((PartitionRangeReadCommand)command).dataRange().keyRange();
- boolean isExcludingBounds = keyRange instanceof ExcludingBounds;
- skipFirst = isExcludingBounds && !keyRange.contains(partitions.get(0).partitionKey());
- skipLast = (isExcludingBounds || keyRange instanceof IncludingExcludingBounds) && !keyRange.contains(partitions.get(partitions.size() - 1).partitionKey());
- }
-
- final List<ImmutableBTreePartition> toReturn;
- if (skipFirst || skipLast)
- {
- toReturn = partitions.size() == 1
- ? Collections.emptyList()
- : partitions.subList(skipFirst ? 1 : 0, skipLast ? partitions.size() - 1 : partitions.size());
- }
- else
- {
- toReturn = partitions;
- }
-
- return new AbstractUnfilteredPartitionIterator()
- {
- private int idx;
-
- public boolean isForThrift()
- {
- return true;
- }
-
- public CFMetaData metadata()
- {
- return command.metadata();
- }
-
- public boolean hasNext()
- {
- return idx < toReturn.size();
- }
-
- public UnfilteredRowIterator next()
- {
- ImmutableBTreePartition partition = toReturn.get(idx++);
-
- ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey());
-
- // Pre-3.0, we would always request one more row than we actually needed and the command-level "start" would
- // be the last-returned cell name, so the response would always include it.
- UnfilteredRowIterator iterator = partition.unfilteredIterator(command.columnFilter(), filter.getSlices(command.metadata()), filter.isReversed());
-
- // Wrap results with a ThriftResultMerger only if they're intended for the thrift command.
- if (command.isForThrift())
- return ThriftResultsMerger.maybeWrap(iterator, command.nowInSec());
- else
- return iterator;
- }
- };
- }
-
- public ByteBuffer digest(ReadCommand command)
- {
- try (UnfilteredPartitionIterator iterator = makeIterator(command))
- {
- return makeDigest(iterator, command);
- }
- }
-
- public boolean isDigestResponse()
- {
- return false;
- }
- }
-
private static class Serializer implements IVersionedSerializer<ReadResponse>
{
public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- if (version < MessagingService.VERSION_30)
- {
- out.writeInt(digest.remaining());
- out.write(digest);
- out.writeBoolean(isDigest);
- if (!isDigest)
- {
- assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
- try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
- {
- assert iter.hasNext();
- try (UnfilteredRowIterator partition = iter.next())
- {
- ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
- LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
- }
- assert !iter.hasNext();
- }
- }
- return;
- }
ByteBufferUtil.writeWithVIntLength(digest, out);
if (!isDigest)
@@ -345,38 +212,12 @@ public abstract class ReadResponse
public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
{
- if (version < MessagingService.VERSION_30)
- {
- byte[] digest = null;
- int digestSize = in.readInt();
- if (digestSize > 0)
- {
- digest = new byte[digestSize];
- in.readFully(digest, 0, digestSize);
- }
- boolean isDigest = in.readBoolean();
- assert isDigest == digestSize > 0;
- if (isDigest)
- {
- assert digest != null;
- return new DigestResponse(ByteBuffer.wrap(digest));
- }
-
- // ReadResponses from older versions are always single-partition (ranges are handled by RangeSliceReply)
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- try (UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
- {
- if (rowIterator == null)
- return new LegacyRemoteDataResponse(Collections.emptyList());
-
- return new LegacyRemoteDataResponse(Collections.singletonList(ImmutableBTreePartition.create(rowIterator)));
- }
- }
-
ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in);
if (digest.hasRemaining())
return new DigestResponse(digest);
+ // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
+ // version, we'll have to deserialize/re-serialize the data to be in the proper version.
assert version == MessagingService.VERSION_30;
ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
return new RemoteDataResponse(data);
@@ -387,28 +228,6 @@ public abstract class ReadResponse
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- if (version < MessagingService.VERSION_30)
- {
- long size = TypeSizes.sizeof(digest.remaining())
- + digest.remaining()
- + TypeSizes.sizeof(isDigest);
- if (!isDigest)
- {
- assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
- try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
- {
- assert iter.hasNext();
- try (UnfilteredRowIterator partition = iter.next())
- {
- size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
- size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
- }
- assert !iter.hasNext();
- }
- }
- return size;
- }
-
long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
if (!isDigest)
{
@@ -421,81 +240,4 @@ public abstract class ReadResponse
return size;
}
}
-
- private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
- {
- public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
-
- // determine the number of partitions upfront for serialization
- int numPartitions = 0;
- assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
- try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
- {
- while (iterator.hasNext())
- {
- try (UnfilteredRowIterator atomIterator = iterator.next())
- {
- numPartitions++;
-
- // we have to fully exhaust the subiterator
- while (atomIterator.hasNext())
- atomIterator.next();
- }
- }
- }
-
- out.writeInt(numPartitions);
-
- try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
- {
- while (iterator.hasNext())
- {
- try (UnfilteredRowIterator partition = iterator.next())
- {
- ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
- LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
- }
- }
- }
- }
-
- public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
- {
- assert version < MessagingService.VERSION_30;
-
- int partitionCount = in.readInt();
- ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount);
- for (int i = 0; i < partitionCount; i++)
- {
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
- {
- partitions.add(ImmutableBTreePartition.create(partition));
- }
- }
- return new LegacyRemoteDataResponse(partitions);
- }
-
- public long serializedSize(ReadResponse response, int version)
- {
- assert version < MessagingService.VERSION_30;
- long size = TypeSizes.sizeof(0); // number of partitions
-
- assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
- try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
- {
- while (iterator.hasNext())
- {
- try (UnfilteredRowIterator partition = iterator.next())
- {
- size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
- size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
- }
- }
- }
- return size;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index e620dc0..a709ec3 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -253,7 +253,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
public Serializer(CFMetaData metadata, Version version, SerializationHeader header)
{
- this.idxInfoSerializer = metadata.serializers().indexInfoSerializer(version, header);
+ this.idxInfoSerializer = IndexInfo.serializer(version, header);
this.version = version;
}
@@ -264,22 +264,16 @@ public class RowIndexEntry<T> implements IMeasurableMemory
public void serialize(RowIndexEntry<IndexInfo> rie, DataOutputPlus out, ByteBuffer indexInfo) throws IOException
{
- assert version.storeRows() : "We read old index files but we should never write them";
-
rie.serialize(out, idxInfoSerializer, indexInfo);
}
public void serializeForCache(RowIndexEntry<IndexInfo> rie, DataOutputPlus out) throws IOException
{
- assert version.storeRows();
-
rie.serializeForCache(out);
}
public RowIndexEntry<IndexInfo> deserializeForCache(DataInputPlus in) throws IOException
{
- assert version.storeRows();
-
long position = in.readUnsignedVInt();
switch (in.readByte())
@@ -297,8 +291,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
public static void skipForCache(DataInputPlus in, Version version) throws IOException
{
- assert version.storeRows();
-
/* long position = */in.readUnsignedVInt();
switch (in.readByte())
{
@@ -317,9 +309,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition) throws IOException
{
- if (!version.storeRows())
- return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer);
-
long position = in.readUnsignedVInt();
int size = (int)in.readUnsignedVInt();
@@ -354,9 +343,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
public long deserializePositionAndSkip(DataInputPlus in) throws IOException
{
- if (!version.storeRows())
- return LegacyShallowIndexedEntry.deserializePositionAndSkip(in);
-
return ShallowIndexedEntry.deserializePositionAndSkip(in);
}
@@ -367,7 +353,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
*/
public static long readPosition(DataInputPlus in, Version version) throws IOException
{
- return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
+ return in.readUnsignedVInt();
}
public static void skip(DataInputPlus in, Version version) throws IOException
@@ -378,7 +364,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
private static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException
{
- int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt();
+ int size = (int)in.readUnsignedVInt();
if (size <= 0)
return;
@@ -413,164 +399,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
out.writeByte(CACHE_NOT_INDEXED);
}
- private static final class LegacyShallowIndexedEntry extends RowIndexEntry<IndexInfo>
- {
- private static final long BASE_SIZE;
- static
- {
- BASE_SIZE = ObjectSizes.measure(new LegacyShallowIndexedEntry(0, 0, DeletionTime.LIVE, 0, new int[0], null, 0));
- }
-
- private final long indexFilePosition;
- private final int[] offsets;
- @Unmetered
- private final IndexInfo.Serializer idxInfoSerializer;
- private final DeletionTime deletionTime;
- private final long headerLength;
- private final int serializedSize;
-
- private LegacyShallowIndexedEntry(long dataFilePosition, long indexFilePosition,
- DeletionTime deletionTime, long headerLength,
- int[] offsets, IndexInfo.Serializer idxInfoSerializer,
- int serializedSize)
- {
- super(dataFilePosition);
- this.deletionTime = deletionTime;
- this.headerLength = headerLength;
- this.indexFilePosition = indexFilePosition;
- this.offsets = offsets;
- this.idxInfoSerializer = idxInfoSerializer;
- this.serializedSize = serializedSize;
- }
-
- @Override
- public DeletionTime deletionTime()
- {
- return deletionTime;
- }
-
- @Override
- public long headerLength()
- {
- return headerLength;
- }
-
- @Override
- public long unsharedHeapSize()
- {
- return BASE_SIZE + offsets.length * TypeSizes.sizeof(0);
- }
-
- @Override
- public int columnsIndexCount()
- {
- return offsets.length;
- }
-
- @Override
- public void serialize(DataOutputPlus out, IndexInfo.Serializer idxInfoSerializer, ByteBuffer indexInfo)
- {
- throw new UnsupportedOperationException("serializing legacy index entries is not supported");
- }
-
- @Override
- public void serializeForCache(DataOutputPlus out)
- {
- throw new UnsupportedOperationException("serializing legacy index entries is not supported");
- }
-
- @Override
- public IndexInfoRetriever openWithIndex(FileHandle indexFile)
- {
- int fieldsSize = (int) DeletionTime.serializer.serializedSize(deletionTime)
- + TypeSizes.sizeof(0); // columnIndexCount
- indexEntrySizeHistogram.update(serializedSize);
- indexInfoCountHistogram.update(offsets.length);
- return new LegacyIndexInfoRetriever(indexFilePosition +
- TypeSizes.sizeof(0L) + // position
- TypeSizes.sizeof(0) + // indexInfoSize
- fieldsSize,
- offsets, indexFile.createReader(), idxInfoSerializer);
- }
-
- public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition,
- IndexInfo.Serializer idxInfoSerializer) throws IOException
- {
- long dataFilePosition = in.readLong();
-
- int size = in.readInt();
- if (size == 0)
- {
- return new RowIndexEntry<>(dataFilePosition);
- }
- else if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
- {
- return new IndexedEntry(dataFilePosition, in, idxInfoSerializer);
- }
- else
- {
- DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
-
- // For legacy sstables (i.e. sstables pre-"ma", pre-3.0) we have to scan all serialized IndexInfo
- // objects to calculate the offsets array. However, it might be possible to deserialize all
- // IndexInfo objects here - but to just skip feels more gentle to the heap/GC.
-
- int entries = in.readInt();
- int[] offsets = new int[entries];
-
- TrackedDataInputPlus tracked = new TrackedDataInputPlus(in);
- long start = tracked.getBytesRead();
- long headerLength = 0L;
- for (int i = 0; i < entries; i++)
- {
- offsets[i] = (int) (tracked.getBytesRead() - start);
- if (i == 0)
- {
- IndexInfo info = idxInfoSerializer.deserialize(tracked);
- headerLength = info.offset;
- }
- else
- idxInfoSerializer.skip(tracked);
- }
-
- return new LegacyShallowIndexedEntry(dataFilePosition, indexFilePosition, deletionTime, headerLength, offsets, idxInfoSerializer, size);
- }
- }
-
- static long deserializePositionAndSkip(DataInputPlus in) throws IOException
- {
- long position = in.readLong();
-
- int size = in.readInt();
- if (size > 0)
- in.skipBytesFully(size);
-
- return position;
- }
- }
-
- private static final class LegacyIndexInfoRetriever extends FileIndexInfoRetriever
- {
- private final int[] offsets;
-
- private LegacyIndexInfoRetriever(long indexFilePosition, int[] offsets, FileDataInput reader, IndexInfo.Serializer idxInfoSerializer)
- {
- super(indexFilePosition, reader, idxInfoSerializer);
- this.offsets = offsets;
- }
-
- IndexInfo fetchIndex(int index) throws IOException
- {
- retrievals++;
-
- // seek to posision of IndexInfo
- indexReader.seek(indexInfoFilePosition + offsets[index]);
-
- // deserialize IndexInfo
- return idxInfoSerializer.deserialize(indexReader);
- }
- }
-
/**
* An entry in the row index for a row whose columns are indexed - used for both legacy and current formats.
*/
@@ -622,14 +450,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory
for (int i = 0; i < columnsIndexCount; i++)
this.columnsIndex[i] = idxInfoSerializer.deserialize(in);
- int[] offsets = null;
- if (version.storeRows())
- {
- offsets = new int[this.columnsIndex.length];
- for (int i = 0; i < offsets.length; i++)
- offsets[i] = in.readInt();
- }
- this.offsets = offsets;
+ this.offsets = new int[this.columnsIndex.length];
+ for (int i = 0; i < offsets.length; i++)
+ offsets[i] = in.readInt();
this.indexedPartSize = indexedPartSize;