You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/08/08 00:44:42 UTC
[3/4] cassandra git commit: On-wire backward compatibility for 3.0
On-wire backward compatibility for 3.0
This adds support for mixed-version clusters with Cassandra 2.1
and 2.2.
Patch by Tyler Hobbs and Sylvain Lebresne for CASSANDRA-9704
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c64cefd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c64cefd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c64cefd
Branch: refs/heads/trunk
Commit: 8c64cefd19d706003d4b33b333274dbf17c9cb34
Parents: 69f0b89
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Fri Aug 7 17:42:18 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Fri Aug 7 17:42:18 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/ModificationStatement.java | 2 +-
.../org/apache/cassandra/db/Clustering.java | 8 +-
src/java/org/apache/cassandra/db/DataRange.java | 28 +-
.../org/apache/cassandra/db/LegacyLayout.java | 922 +++++++++++++++--
.../apache/cassandra/db/PartitionColumns.java | 6 +
.../cassandra/db/PartitionRangeReadCommand.java | 11 +
.../cassandra/db/RangeSliceVerbHandler.java | 29 +
.../org/apache/cassandra/db/ReadCommand.java | 995 ++++++++++++++++++-
.../cassandra/db/ReadCommandVerbHandler.java | 9 +-
.../org/apache/cassandra/db/ReadResponse.java | 250 ++++-
.../db/SinglePartitionReadCommand.java | 11 +-
src/java/org/apache/cassandra/db/Slice.java | 48 +-
.../filter/AbstractClusteringIndexFilter.java | 20 -
.../db/filter/ClusteringIndexFilter.java | 20 +
.../db/filter/ClusteringIndexNamesFilter.java | 4 +-
.../db/filter/ClusteringIndexSliceFilter.java | 4 +-
.../cassandra/db/filter/ColumnFilter.java | 3 +
.../apache/cassandra/db/filter/DataLimits.java | 12 +-
.../db/marshal/AbstractCompositeType.java | 32 -
.../cassandra/db/marshal/CompositeType.java | 26 +
.../AbstractThreadUnsafePartition.java | 2 +-
.../db/partitions/PartitionUpdate.java | 93 +-
.../UnfilteredPartitionIterators.java | 13 +-
.../cassandra/db/rows/BTreeBackedRow.java | 62 ++
src/java/org/apache/cassandra/db/rows/Row.java | 12 +
.../apache/cassandra/net/MessagingService.java | 4 +-
.../cassandra/service/AbstractReadExecutor.java | 5 +-
.../apache/cassandra/service/DataResolver.java | 8 +-
.../cassandra/service/DigestResolver.java | 6 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 4 +-
.../cassandra/service/StorageService.java | 4 +-
.../service/pager/RangeSliceQueryPager.java | 4 +-
.../service/pager/SinglePartitionPager.java | 8 +-
.../cassandra/thrift/CassandraServer.java | 4 +-
36 files changed, 2353 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 216d3f7..0ba7b4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
+ * Support mixed-version clusters with Cassandra 2.1 and 2.2 (CASSANDRA-9704)
* Fix multiple slices on RowSearchers (CASSANDRA-10002)
* Fix bug in merging of collections (CASSANDRA-10001)
* Optimize batchlog replay to avoid full scans (CASSANDRA-7237)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9f2c952..5fa1842 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -544,7 +544,7 @@ public abstract class ModificationStatement implements CQLStatement
key,
new ClusteringIndexNamesFilter(clusterings, false)));
- Map<DecoratedKey, Partition> map = new HashMap();
+ Map<DecoratedKey, Partition> map = new HashMap<>();
SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 7754182..a29ce65 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -57,10 +57,16 @@ public class Clustering extends AbstractClusteringPrefix
}
@Override
- public String toString(CFMetaData metadata)
+ public String toString()
{
return "STATIC";
}
+
+ @Override
+ public String toString(CFMetaData metadata)
+ {
+ return toString();
+ }
};
/** Empty clustering for tables having no clustering columns. */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 79b2448..ffe041e 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -149,6 +149,16 @@ public class DataRange
}
/**
+ * Whether the data range is for a paged request or not.
+ *
+ * @return true if for paging, false otherwise
+ */
+ public boolean isPaging()
+ {
+ return false;
+ }
+
+ /**
* Whether the range queried by this {@code DataRange} actually wraps around.
*
* @return whether the range queried by this {@code DataRange} actually wraps around.
@@ -307,7 +317,7 @@ public class DataRange
* first queried partition (the one for that last result) so it only fetch results that follow that
* last result. In other words, this makes sure this resume paging where we left off.
*/
- private static class Paging extends DataRange
+ public static class Paging extends DataRange
{
private final ClusteringComparator comparator;
private final Clustering lastReturned;
@@ -349,6 +359,20 @@ public class DataRange
: new DataRange(range, clusteringIndexFilter);
}
+ /**
+ * @return the last Clustering that was returned (in the previous page)
+ */
+ public Clustering getLastReturned()
+ {
+ return lastReturned;
+ }
+
+ @Override
+ public boolean isPaging()
+ {
+ return true;
+ }
+
@Override
public boolean isUnrestricted()
{
@@ -358,7 +382,7 @@ public class DataRange
@Override
public String toString(CFMetaData metadata)
{
- return String.format("range=%s pfilter=%s lastReturned=%s (%s)",
+ return String.format("range=%s (paging) pfilter=%s lastReturned=%s (%s)",
keyRange.getString(metadata.getKeyValidator()),
clusteringIndexFilter.toString(metadata),
lastReturned.toString(metadata),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 696c1c9..50e5d04 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -25,10 +25,9 @@ import java.util.*;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -38,6 +37,7 @@ import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.*;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -47,14 +47,14 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
*/
public abstract class LegacyLayout
{
- private static final Logger logger = LoggerFactory.getLogger(LegacyLayout.class);
-
public final static int MAX_CELL_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
- private final static int DELETION_MASK = 0x01;
- private final static int EXPIRATION_MASK = 0x02;
- private final static int COUNTER_MASK = 0x04;
- private final static int COUNTER_UPDATE_MASK = 0x08;
+ public final static int STATIC_PREFIX = 0xFFFF;
+
+ public final static int DELETION_MASK = 0x01;
+ public final static int EXPIRATION_MASK = 0x02;
+ public final static int COUNTER_MASK = 0x04;
+ public final static int COUNTER_UPDATE_MASK = 0x08;
private final static int RANGE_TOMBSTONE_MASK = 0x10;
private LegacyLayout() {}
@@ -177,25 +177,69 @@ public abstract class LegacyLayout
if (!bound.hasRemaining())
return isStart ? LegacyBound.BOTTOM : LegacyBound.TOP;
- List<ByteBuffer> components = metadata.isCompound()
- ? CompositeType.splitName(bound)
- : Collections.singletonList(bound);
+ List<CompositeType.CompositeComponent> components = metadata.isCompound()
+ ? CompositeType.deconstruct(bound)
+ : Collections.singletonList(new CompositeType.CompositeComponent(bound, (byte) 0));
// Either it's a prefix of the clustering, or it's the bound of a collection range tombstone (and thus has
// the collection column name)
assert components.size() <= metadata.comparator.size() || (!metadata.isCompactTable() && components.size() == metadata.comparator.size() + 1);
- List<ByteBuffer> prefix = components.size() <= metadata.comparator.size() ? components : components.subList(0, metadata.comparator.size());
- Slice.Bound sb = Slice.Bound.create(isStart ? Slice.Bound.Kind.INCL_START_BOUND : Slice.Bound.Kind.INCL_END_BOUND,
- prefix.toArray(new ByteBuffer[prefix.size()]));
+ List<CompositeType.CompositeComponent> prefix = components.size() <= metadata.comparator.size()
+ ? components
+ : components.subList(0, metadata.comparator.size());
+ Slice.Bound.Kind boundKind;
+ if (isStart)
+ {
+ if (components.get(components.size() - 1).eoc > 0)
+ boundKind = Slice.Bound.Kind.EXCL_START_BOUND;
+ else
+ boundKind = Slice.Bound.Kind.INCL_START_BOUND;
+ }
+ else
+ {
+ if (components.get(components.size() - 1).eoc < 0)
+ boundKind = Slice.Bound.Kind.EXCL_END_BOUND;
+ else
+ boundKind = Slice.Bound.Kind.INCL_END_BOUND;
+ }
+
+ ByteBuffer[] prefixValues = new ByteBuffer[prefix.size()];
+ for (int i = 0; i < prefix.size(); i++)
+ prefixValues[i] = prefix.get(i).value;
+ Slice.Bound sb = Slice.Bound.create(boundKind, prefixValues);
ColumnDefinition collectionName = components.size() == metadata.comparator.size() + 1
- ? metadata.getColumnDefinition(components.get(metadata.comparator.size()))
+ ? metadata.getColumnDefinition(components.get(metadata.comparator.size()).value)
: null;
return new LegacyBound(sb, metadata.isCompound() && CompositeType.isStaticName(bound), collectionName);
}
- public static ByteBuffer encodeCellName(CFMetaData metadata, Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement)
+ public static ByteBuffer encodeBound(CFMetaData metadata, Slice.Bound bound, boolean isStart)
+ {
+ if (bound == Slice.Bound.BOTTOM || bound == Slice.Bound.TOP || metadata.comparator.size() == 0)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+ ClusteringPrefix clustering = bound.clustering();
+
+ if (!metadata.isCompound())
+ {
+ assert clustering.size() == 1;
+ return clustering.get(0);
+ }
+
+ CompositeType ctype = CompositeType.getInstance(metadata.comparator.subtypes());
+ CompositeType.Builder builder = ctype.builder();
+ for (int i = 0; i < clustering.size(); i++)
+ builder.add(clustering.get(i));
+
+ if (isStart)
+ return bound.isInclusive() ? builder.build() : builder.buildAsEndOfRange();
+ else
+ return bound.isInclusive() ? builder.buildAsEndOfRange() : builder.build();
+ }
+
+ public static ByteBuffer encodeCellName(CFMetaData metadata, ClusteringPrefix clustering, ByteBuffer columnName, ByteBuffer collectionElement)
{
boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
@@ -204,7 +248,7 @@ public abstract class LegacyLayout
if (isStatic)
return columnName;
- assert clustering.size() == 1;
+ assert clustering.size() == 1 : "Expected clustering size to be 1, but was " + clustering.size();
return clustering.get(0);
}
@@ -253,8 +297,11 @@ public abstract class LegacyLayout
return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
}
- public static ByteBuffer encodeClustering(CFMetaData metadata, Clustering clustering)
+ public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering)
{
+ if (clustering.size() == 0)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
if (!metadata.isCompound())
{
assert clustering.size() == 1;
@@ -268,14 +315,151 @@ public abstract class LegacyLayout
}
// For serializing to old wire format
- public static Pair<DeletionInfo, Iterator<LegacyCell>> fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
+ public static LegacyUnfilteredPartition fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
{
// we need to extract the range tombstone so materialize the partition. Since this is
// used for the on-wire format, this is not worst than it used to be.
final ArrayBackedPartition partition = ArrayBackedPartition.create(iterator);
DeletionInfo info = partition.deletionInfo();
- Iterator<LegacyCell> cells = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow());
- return Pair.create(info, cells);
+ Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> pair = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow());
+
+ LegacyLayout.LegacyRangeTombstoneList rtl = pair.left;
+
+ // Processing the cell iterator results in the LegacyRangeTombstoneList being populated, so we do this
+ // before we use the LegacyRangeTombstoneList at all
+ List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right);
+
+ // The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex
+ // deletions. Go through our normal range tombstones and add then to the LegacyRTL so that the range
+ // tombstones all get merged and sorted properly.
+ if (info.hasRanges())
+ {
+ Iterator<RangeTombstone> rangeTombstoneIterator = info.rangeIterator(false);
+ while (rangeTombstoneIterator.hasNext())
+ {
+ RangeTombstone rt = rangeTombstoneIterator.next();
+ Slice slice = rt.deletedSlice();
+ LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(slice.start(), false, null);
+ LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(slice.end(), false, null);
+ rtl.add(start, end, rt.deletionTime().markedForDeleteAt(), rt.deletionTime().localDeletionTime());
+ }
+ }
+
+ return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells);
+ }
+
+ public static void serializeAsLegacyPartition(UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ out.writeBoolean(true);
+
+ LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+
+ UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version);
+ DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out);
+
+ legacyPartition.rangeTombstones.serialize(out, partition.metadata());
+
+ // begin cell serialization
+ out.writeInt(legacyPartition.cells.size());
+ for (LegacyLayout.LegacyCell cell : legacyPartition.cells)
+ {
+ ByteBufferUtil.writeWithShortLength(cell.name.encode(partition.metadata()), out);
+ if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING)
+ {
+ out.writeByte(LegacyLayout.EXPIRATION_MASK); // serialization flags
+ out.writeInt(cell.ttl);
+ out.writeInt(cell.localDeletionTime);
+ }
+ else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED)
+ {
+ out.writeByte(LegacyLayout.DELETION_MASK); // serialization flags
+ out.writeLong(cell.timestamp);
+ out.writeInt(TypeSizes.sizeof(cell.localDeletionTime));
+ out.writeInt(cell.localDeletionTime);
+ continue;
+ }
+ else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER)
+ {
+ out.writeByte(LegacyLayout.COUNTER_MASK); // serialization flags
+ out.writeLong(Long.MIN_VALUE); // timestampOfLastDelete (not used, and MIN_VALUE is the default)
+ }
+ else
+ {
+ // normal cell
+ out.writeByte(0); // serialization flags
+ }
+
+ out.writeLong(cell.timestamp);
+ ByteBufferUtil.writeWithLength(cell.value, out);
+ }
+ }
+
+ // For the old wire format
+ // Note: this can return null if an empty partition is serialized!
+ public static UnfilteredRowIterator deserializeLegacyPartition(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ // This is only used in mutation, and mutation have never allowed "null" column families
+ boolean present = in.readBoolean();
+ if (!present)
+ return null;
+
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ LegacyDeletionInfo info = LegacyDeletionInfo.deserialize(metadata, in);
+ int size = in.readInt();
+ Iterator<LegacyCell> cells = deserializeCells(metadata, in, flag, size);
+ SerializationHelper helper = new SerializationHelper(metadata, version, flag);
+ return onWireCellstoUnfilteredRowIterator(metadata, metadata.partitioner.decorateKey(key), info, cells, false, helper);
+ }
+
+ // For the old wire format
+ public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator partition, int version)
+ {
+ assert version < MessagingService.VERSION_30;
+
+ if (partition.isEmpty())
+ return TypeSizes.sizeof(false);
+
+ long size = TypeSizes.sizeof(true);
+
+ LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+
+ size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version);
+ size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion);
+ size += legacyPartition.rangeTombstones.serializedSize(partition.metadata());
+
+ // begin cell serialization
+ size += TypeSizes.sizeof(legacyPartition.cells.size());
+ for (LegacyLayout.LegacyCell cell : legacyPartition.cells)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(cell.name.encode(partition.metadata()));
+ size += 1; // serialization flags
+ if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING)
+ {
+ size += TypeSizes.sizeof(cell.ttl);
+ size += TypeSizes.sizeof(cell.localDeletionTime);
+ }
+ else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED)
+ {
+ size += TypeSizes.sizeof(cell.timestamp);
+ // localDeletionTime replaces cell.value as the body
+ size += TypeSizes.sizeof(TypeSizes.sizeof(cell.localDeletionTime));
+ size += TypeSizes.sizeof(cell.localDeletionTime);
+ continue;
+ }
+ else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER)
+ {
+ size += TypeSizes.sizeof(Long.MIN_VALUE); // timestampOfLastDelete
+ }
+
+ size += TypeSizes.sizeof(cell.timestamp);
+ size += ByteBufferUtil.serializedSizeWithLength(cell.value);
+ }
+
+ return size;
}
// For thrift sake
@@ -296,6 +480,7 @@ public abstract class LegacyLayout
boolean reversed,
SerializationHelper helper)
{
+
// If the table is a static compact, the "column_metadata" are now internally encoded as
// static. This has already been recognized by decodeCellName, but it means the cells
// provided are not in the expected order (the "static" cells are not necessarily at the front).
@@ -441,18 +626,27 @@ public abstract class LegacyLayout
};
}
- public static Iterator<LegacyCell> fromRowIterator(final RowIterator iterator)
+ public static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final RowIterator iterator)
{
return fromRowIterator(iterator.metadata(), iterator, iterator.staticRow());
}
- public static Iterator<LegacyCell> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow)
+ private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow)
{
- return new AbstractIterator<LegacyCell>()
+ LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10);
+ Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
{
- private Iterator<LegacyCell> currentRow = staticRow.isEmpty()
- ? Collections.<LegacyLayout.LegacyCell>emptyIterator()
- : fromRow(metadata, staticRow);
+ private Iterator<LegacyCell> currentRow = initializeRow();
+
+ private Iterator<LegacyCell> initializeRow()
+ {
+ if (staticRow == null || staticRow.isEmpty())
+ return Collections.<LegacyLayout.LegacyCell>emptyIterator();
+
+ Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, staticRow);
+ deletions.addAll(row.left);
+ return row.right;
+ }
protected LegacyCell computeNext()
{
@@ -462,17 +656,58 @@ public abstract class LegacyLayout
if (!iterator.hasNext())
return endOfData();
- currentRow = fromRow(metadata, iterator.next());
+ Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, iterator.next());
+ deletions.addAll(row.left);
+ currentRow = row.right;
return computeNext();
}
};
+
+ return Pair.create(deletions, cells);
}
- private static Iterator<LegacyCell> fromRow(final CFMetaData metadata, final Row row)
+ private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRow(final CFMetaData metadata, final Row row)
{
- return new AbstractIterator<LegacyCell>()
+ // convert any complex deletions or row deletion into normal range tombstones so that we can build and send a proper RangeTombstoneList
+ // to legacy nodes
+ LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10);
+
+ if (!row.deletion().isLive())
{
- private final Iterator<Cell> cells = row.cells().iterator();
+ Clustering clustering = row.clustering();
+ Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering);
+ Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering);
+
+ LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, null);
+ LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, null);
+
+ deletions.add(start, end, row.deletion().markedForDeleteAt(), row.deletion().localDeletionTime());
+ }
+
+ for (ColumnData cd : row)
+ {
+ ColumnDefinition col = cd.column();
+ if (col.isSimple())
+ continue;
+
+ DeletionTime delTime = ((ComplexColumnData)cd).complexDeletion();
+ if (!delTime.isLive())
+ {
+ Clustering clustering = row.clustering();
+
+ Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering);
+ Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering);
+
+ LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, col.isStatic(), col);
+ LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, col.isStatic(), col);
+
+ deletions.add(start, end, delTime.markedForDeleteAt(), delTime.localDeletionTime());
+ }
+ }
+
+ Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
+ {
+ private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata).iterator();
// we don't have (and shouldn't have) row markers for compact tables.
private boolean hasReturnedRowMarker = metadata.isCompactTable();
@@ -481,18 +716,24 @@ public abstract class LegacyLayout
if (!hasReturnedRowMarker)
{
hasReturnedRowMarker = true;
- LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null);
- LivenessInfo info = row.primaryKeyLivenessInfo();
- return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl());
+
+ // don't include a row marker if there's no timestamp on the primary key; this is the 3.0+ equivalent
+ // of a row marker
+ if (!row.primaryKeyLivenessInfo().isEmpty())
+ {
+ LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null);
+ LivenessInfo info = row.primaryKeyLivenessInfo();
+ return new LegacyCell(info.isExpiring() ? LegacyCell.Kind.EXPIRING : LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl());
+ }
}
if (!cells.hasNext())
return endOfData();
- Cell cell = cells.next();
- return makeLegacyCell(row.clustering(), cell);
+ return makeLegacyCell(row.clustering(), cells.next());
}
};
+ return Pair.create(deletions, cells);
}
private static LegacyCell makeLegacyCell(Clustering clustering, Cell cell)
@@ -554,6 +795,9 @@ public abstract class LegacyLayout
};
}
+ // Note that this doesn't exactly compare cells as they were pre-3.0 because within a row they sort columns like
+ // in 3.0, that is, with simple columns before complex columns. In other words, this comparator makes sure cells
+ // are in the proper order to convert them to actual 3.0 rows.
public static Comparator<LegacyCellName> legacyCellNameComparator(final CFMetaData metadata, final boolean reversed)
{
return new Comparator<LegacyCellName>()
@@ -591,13 +835,9 @@ public abstract class LegacyLayout
assert c1.column.isRegular() || c1.column.isStatic();
assert c2.column.isRegular() || c2.column.isStatic();
- if (c1.column.kind != c2.column.kind)
- return c1.column.isStatic() ? -1 : 1;
-
- AbstractType<?> cmp = metadata.getColumnDefinitionNameComparator(c1.column.kind);
- int c = cmp.compare(c1.column.name.bytes, c2.column.name.bytes);
- if (c != 0)
- return c;
+ int cmp = c1.column.compareTo(c2.column);
+ if (cmp != 0)
+ return cmp;
}
assert (c1.collectionElement == null) == (c2.collectionElement == null);
@@ -748,13 +988,6 @@ public abstract class LegacyLayout
}
}
- public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInputPlus in) throws IOException
- {
- ByteBuffer boundname = ByteBufferUtil.readWithShortLength(in);
- in.readUnsignedByte();
- return readLegacyRangeTombstoneBody(metadata, in, boundname);
- }
-
public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInputPlus in, ByteBuffer boundname) throws IOException
{
LegacyBound min = decodeBound(metadata, boundname, true);
@@ -806,7 +1039,7 @@ public abstract class LegacyLayout
public final CFMetaData metadata;
private final boolean isStatic;
private final SerializationHelper helper;
- private Row.Builder builder;
+ private final Row.Builder builder;
private Clustering clustering;
private LegacyRangeTombstone rowDeletion;
@@ -822,7 +1055,11 @@ public abstract class LegacyLayout
this.metadata = metadata;
this.isStatic = isStatic;
this.helper = helper;
- this.builder = BTreeBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
+ // We cannot use a sorted builder because we don't have exactly the same ordering in 3.0 and pre-3.0. More precisely, within a row, we
+ // store all simple columns before the complex ones in 3.0, which we use to sort everything sorted by the column name before. Note however
+ // that the unsorted builder won't have to reconcile cells, so the exact value we pass for nowInSec doesn't matter.
+ this.builder = BTreeBackedRow.unsortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars, FBUtilities.nowInSeconds());
+
}
public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper)
@@ -939,6 +1176,21 @@ public abstract class LegacyLayout
}
}
+ public static class LegacyUnfilteredPartition
+ {
+ public final DeletionTime partitionDeletion;
+ public final LegacyRangeTombstoneList rangeTombstones;
+ public final List<LegacyCell> cells;
+
+ private LegacyUnfilteredPartition(DeletionTime partitionDeletion, LegacyRangeTombstoneList rangeTombstones, List<LegacyCell> cells)
+ {
+ this.partitionDeletion = partitionDeletion;
+ this.rangeTombstones = rangeTombstones;
+ this.cells = cells;
+ }
+ }
+
+
public static class LegacyCellName
{
public final Clustering clustering;
@@ -987,7 +1239,7 @@ public abstract class LegacyLayout
public final boolean isStatic;
public final ColumnDefinition collectionName;
- private LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName)
+ public LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName)
{
this.bound = bound;
this.isStatic = isStatic;
@@ -1131,10 +1383,7 @@ public abstract class LegacyLayout
if (isTombstone())
return false;
- if (isExpiring())
- return nowInSec < localDeletionTime;
-
- return true;
+ return !isExpiring() || nowInSec < localDeletionTime;
}
@Override
@@ -1240,10 +1489,8 @@ public abstract class LegacyLayout
public static class LegacyDeletionInfo
{
- public static final Serializer serializer = new Serializer();
-
public final DeletionInfo deletionInfo;
- private final List<LegacyRangeTombstone> inRowTombstones;
+ public final List<LegacyRangeTombstone> inRowTombstones;
private LegacyDeletionInfo(DeletionInfo deletionInfo, List<LegacyRangeTombstone> inRowTombstones)
{
@@ -1253,7 +1500,17 @@ public abstract class LegacyLayout
public static LegacyDeletionInfo from(DeletionInfo info)
{
- return new LegacyDeletionInfo(info, Collections.<LegacyRangeTombstone>emptyList());
+ List<LegacyRangeTombstone> rangeTombstones = new ArrayList<>(info.rangeCount());
+ Iterator<RangeTombstone> iterator = info.rangeIterator(false);
+ while (iterator.hasNext())
+ {
+ RangeTombstone rt = iterator.next();
+ Slice slice = rt.deletedSlice();
+ rangeTombstones.add(new LegacyRangeTombstone(new LegacyBound(slice.start(), false, null),
+ new LegacyBound(slice.end(), false, null),
+ rt.deletionTime()));
+ }
+ return new LegacyDeletionInfo(info, rangeTombstones);
}
public static LegacyDeletionInfo live()
@@ -1266,47 +1523,536 @@ public abstract class LegacyLayout
return inRowTombstones.iterator();
}
- public static class Serializer
+ public static LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in) throws IOException
{
- public void serialize(CFMetaData metadata, LegacyDeletionInfo info, DataOutputPlus out, int version) throws IOException
+ DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
+
+ int rangeCount = in.readInt();
+ if (rangeCount == 0)
+ return from(new MutableDeletionInfo(topLevel));
+
+ RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount);
+ List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>();
+ for (int i = 0; i < rangeCount; i++)
{
- throw new UnsupportedOperationException();
- //DeletionTime.serializer.serialize(info.topLevel, out);
- //rtlSerializer.serialize(info.ranges, out, version);
+ LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
+ LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
+ int delTime = in.readInt();
+ long markedAt = in.readLong();
+
+ LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime));
+ if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata))
+ inRowTombsones.add(tombstone);
+ else
+ ranges.add(start.bound, end.bound, markedAt, delTime);
+ }
+ return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones);
+ }
+ }
+
+ /**
+ * A helper class for LegacyRangeTombstoneList. This replaces the Comparator<Composite> that RTL used before 3.0.
+ */
+ private static class LegacyBoundComparator implements Comparator<LegacyBound>
+ {
+ ClusteringComparator clusteringComparator;
+
+ public LegacyBoundComparator(ClusteringComparator clusteringComparator)
+ {
+ this.clusteringComparator = clusteringComparator;
+ }
+
+ public int compare(LegacyBound a, LegacyBound b)
+ {
+ int result = this.clusteringComparator.compare(a.bound, b.bound);
+ if (result != 0)
+ return result;
+
+ return UTF8Type.instance.compare(a.collectionName.name.bytes, b.collectionName.name.bytes);
+ }
+ }
+
+ /**
+ * Almost an entire copy of RangeTombstoneList from C* 2.1. The main difference is that LegacyBoundComparator
+ * is used in place of Comparator<Composite> (because Composite doesn't exist any more).
+ *
+ * This class is needed to allow us to convert single-row deletions and complex deletions into range tombstones
+ * and properly merge them into the normal set of range tombstones.
+ */
+ public static class LegacyRangeTombstoneList
+ {
+ private final LegacyBoundComparator comparator;
+
+ // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could
+ // use a List for starts and ends, but having arrays everywhere is almost simpler.
+ private LegacyBound[] starts;
+ private LegacyBound[] ends;
+ private long[] markedAts;
+ private int[] delTimes;
+
+ private int size;
+
+ private LegacyRangeTombstoneList(LegacyBoundComparator comparator, LegacyBound[] starts, LegacyBound[] ends, long[] markedAts, int[] delTimes, int size)
+ {
+ assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length;
+ this.comparator = comparator;
+ this.starts = starts;
+ this.ends = ends;
+ this.markedAts = markedAts;
+ this.delTimes = delTimes;
+ this.size = size;
+ }
+
+ public LegacyRangeTombstoneList(LegacyBoundComparator comparator, int capacity)
+ {
+ this(comparator, new LegacyBound[capacity], new LegacyBound[capacity], new long[capacity], new int[capacity], 0);
+ }
+
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
+ public int size()
+ {
+ return size;
+ }
+
+ /**
+ * Adds a new range tombstone.
+ *
+ * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case),
+ * but it doesn't assume it.
+ */
+ public void add(LegacyBound start, LegacyBound end, long markedAt, int delTime)
+ {
+ if (isEmpty())
+ {
+ addInternal(0, start, end, markedAt, delTime);
+ return;
}
- public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in, int version) throws IOException
+ int c = comparator.compare(ends[size-1], start);
+
+ // Fast path if we add in sorted order
+ if (c <= 0)
+ {
+ addInternal(size, start, end, markedAt, delTime);
+ }
+ else
{
- DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
+ // Note: insertFrom expect i to be the insertion point in term of interval ends
+ int pos = Arrays.binarySearch(ends, 0, size, start, comparator);
+ insertFrom((pos >= 0 ? pos : -pos-1), start, end, markedAt, delTime);
+ }
+ }
- int rangeCount = in.readInt();
- if (rangeCount == 0)
- return from(new MutableDeletionInfo(topLevel));
+ /*
+ * Inserts a new element starting at index i. This method assumes that:
+ * ends[i-1] <= start <= ends[i]
+ *
+ * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that:
+ * - s_i <= e_i
+ * - e_i <= s_i+1
+ * - if s_i == e_i and e_i == s_i+1 then s_i+1 < e_i+1
+ * Basically, range are non overlapping except for their bound and in order. And while
+ * we allow ranges with the same value for the start and end, we don't allow repeating
+ * such range (so we can't have [0, 0][0, 0] even though it would respect the first 2
+ * conditions).
+ *
+ */
+
+ /**
+ * Adds all the range tombstones of {@code tombstones} to this RangeTombstoneList.
+ */
+ public void addAll(LegacyRangeTombstoneList tombstones)
+ {
+ if (tombstones.isEmpty())
+ return;
+
+ if (isEmpty())
+ {
+ copyArrays(tombstones, this);
+ return;
+ }
- RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount);
- List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>();
- for (int i = 0; i < rangeCount; i++)
+ /*
+ * We basically have 2 techniques we can use here: either we repeatedly call add() on tombstones values,
+ * or we do a merge of both (sorted) lists. If this lists is bigger enough than the one we add, then
+ * calling add() will be faster, otherwise it's merging that will be faster.
+ *
+ * Let's note that during memtables updates, it might not be uncommon that a new update has only a few range
+ * tombstones, while the CF we're adding it to (the one in the memtable) has many. In that case, using add() is
+ * likely going to be faster.
+ *
+ * In other cases however, like when diffing responses from multiple nodes, the tombstone lists we "merge" will
+ * be likely sized, so using add() might be a bit inefficient.
+ *
+ * Roughly speaking (this ignore the fact that updating an element is not exactly constant but that's not a big
+ * deal), if n is the size of this list and m is tombstones size, merging is O(n+m) while using add() is O(m*log(n)).
+ *
+ * But let's not crank up a logarithm computation for that. Long story short, merging will be a bad choice only
+ * if this list size is lot bigger that the other one, so let's keep it simple.
+ */
+ if (size > 10 * tombstones.size)
+ {
+ for (int i = 0; i < tombstones.size; i++)
+ add(tombstones.starts[i], tombstones.ends[i], tombstones.markedAts[i], tombstones.delTimes[i]);
+ }
+ else
+ {
+ int i = 0;
+ int j = 0;
+ while (i < size && j < tombstones.size)
{
- LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
- LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
- int delTime = in.readInt();
- long markedAt = in.readLong();
-
- LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime));
- if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata))
- inRowTombsones.add(tombstone);
+ if (comparator.compare(tombstones.starts[j], ends[i]) <= 0)
+ {
+ insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
+ j++;
+ }
else
- ranges.add(start.bound, end.bound, markedAt, delTime);
+ {
+ i++;
+ }
}
- return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones);
+ // Addds the remaining ones from tombstones if any (note that addInternal will increment size if relevant).
+ for (; j < tombstones.size; j++)
+ addInternal(size, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
}
+ }
- public long serializedSize(CFMetaData metadata, LegacyDeletionInfo info, TypeSizes typeSizes, int version)
+ private static void copyArrays(LegacyRangeTombstoneList src, LegacyRangeTombstoneList dst)
+ {
+ dst.grow(src.size);
+ System.arraycopy(src.starts, 0, dst.starts, 0, src.size);
+ System.arraycopy(src.ends, 0, dst.ends, 0, src.size);
+ System.arraycopy(src.markedAts, 0, dst.markedAts, 0, src.size);
+ System.arraycopy(src.delTimes, 0, dst.delTimes, 0, src.size);
+ dst.size = src.size;
+ }
+
+ private void insertFrom(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime)
+ {
+ while (i < size)
{
- throw new UnsupportedOperationException();
- //long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
- //return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version);
+ assert i == 0 || comparator.compare(ends[i-1], start) <= 0;
+
+ int c = comparator.compare(start, ends[i]);
+ assert c <= 0;
+ if (c == 0)
+ {
+ // If start == ends[i], then we can insert from the next one (basically the new element
+ // really start at the next element), except for the case where starts[i] == ends[i].
+ // In this latter case, if we were to move to next element, we could end up with ...[x, x][x, x]...
+ if (comparator.compare(starts[i], ends[i]) == 0)
+ {
+ // The current element cover a single value which is equal to the start of the inserted
+ // element. If the inserted element overwrites the current one, just remove the current
+ // (it's included in what we insert) and proceed with the insert.
+ if (markedAt > markedAts[i])
+ {
+ removeInternal(i);
+ continue;
+ }
+
+ // Otherwise (the current singleton interval override the new one), we want to leave the
+ // current element and move to the next, unless start == end since that means the new element
+ // is in fact fully covered by the current one (so we're done)
+ if (comparator.compare(start, end) == 0)
+ return;
+ }
+ i++;
+ continue;
+ }
+
+ // Do we overwrite the current element?
+ if (markedAt > markedAts[i])
+ {
+ // We do overwrite.
+
+ // First deal with what might come before the newly added one.
+ if (comparator.compare(starts[i], start) < 0)
+ {
+ addInternal(i, starts[i], start, markedAts[i], delTimes[i]);
+ i++;
+ // We don't need to do the following line, but in spirit that's what we want to do
+ // setInternal(i, start, ends[i], markedAts, delTime])
+ }
+
+ // now, start <= starts[i]
+
+ // Does the new element stops before/at the current one,
+ int endCmp = comparator.compare(end, starts[i]);
+ if (endCmp <= 0)
+ {
+ // Here start <= starts[i] and end <= starts[i]
+ // This means the current element is before the current one. However, one special
+ // case is if end == starts[i] and starts[i] == ends[i]. In that case,
+ // the new element entirely overwrite the current one and we can just overwrite
+ if (endCmp == 0 && comparator.compare(starts[i], ends[i]) == 0)
+ setInternal(i, start, end, markedAt, delTime);
+ else
+ addInternal(i, start, end, markedAt, delTime);
+ return;
+ }
+
+ // Do we overwrite the current element fully?
+ int cmp = comparator.compare(ends[i], end);
+ if (cmp <= 0)
+ {
+ // We do overwrite fully:
+ // update the current element until it's end and continue
+ // on with the next element (with the new inserted start == current end).
+
+ // If we're on the last element, we can optimize
+ if (i == size-1)
+ {
+ setInternal(i, start, end, markedAt, delTime);
+ return;
+ }
+
+ setInternal(i, start, ends[i], markedAt, delTime);
+ if (cmp == 0)
+ return;
+
+ start = ends[i];
+ i++;
+ }
+ else
+ {
+ // We don't ovewrite fully. Insert the new interval, and then update the now next
+ // one to reflect the not overwritten parts. We're then done.
+ addInternal(i, start, end, markedAt, delTime);
+ i++;
+ setInternal(i, end, ends[i], markedAts[i], delTimes[i]);
+ return;
+ }
+ }
+ else
+ {
+ // we don't overwrite the current element
+
+ // If the new interval starts before the current one, insert that new interval
+ if (comparator.compare(start, starts[i]) < 0)
+ {
+ // If we stop before the start of the current element, just insert the new
+ // interval and we're done; otherwise insert until the beginning of the
+ // current element
+ if (comparator.compare(end, starts[i]) <= 0)
+ {
+ addInternal(i, start, end, markedAt, delTime);
+ return;
+ }
+ addInternal(i, start, starts[i], markedAt, delTime);
+ i++;
+ }
+
+ // After that, we're overwritten on the current element but might have
+ // some residual parts after ...
+
+ // ... unless we don't extend beyond it.
+ if (comparator.compare(end, ends[i]) <= 0)
+ return;
+
+ start = ends[i];
+ i++;
+ }
+ }
+
+ // If we got there, then just insert the remainder at the end
+ addInternal(i, start, end, markedAt, delTime);
+ }
+ private int capacity()
+ {
+ return starts.length;
+ }
+
+ private void addInternal(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime)
+ {
+ assert i >= 0;
+
+ if (size == capacity())
+ growToFree(i);
+ else if (i < size)
+ moveElements(i);
+
+ setInternal(i, start, end, markedAt, delTime);
+ size++;
+ }
+
+ private void removeInternal(int i)
+ {
+ assert i >= 0;
+
+ System.arraycopy(starts, i+1, starts, i, size - i - 1);
+ System.arraycopy(ends, i+1, ends, i, size - i - 1);
+ System.arraycopy(markedAts, i+1, markedAts, i, size - i - 1);
+ System.arraycopy(delTimes, i+1, delTimes, i, size - i - 1);
+
+ --size;
+ starts[size] = null;
+ ends[size] = null;
+ }
+
+ /*
+ * Grow the arrays, leaving index i "free" in the process.
+ */
+ private void growToFree(int i)
+ {
+ int newLength = (capacity() * 3) / 2 + 1;
+ grow(i, newLength);
+ }
+
+ /*
+ * Grow the arrays to match newLength capacity.
+ */
+ private void grow(int newLength)
+ {
+ if (capacity() < newLength)
+ grow(-1, newLength);
+ }
+
+ private void grow(int i, int newLength)
+ {
+ starts = grow(starts, size, newLength, i);
+ ends = grow(ends, size, newLength, i);
+ markedAts = grow(markedAts, size, newLength, i);
+ delTimes = grow(delTimes, size, newLength, i);
+ }
+
+ private static LegacyBound[] grow(LegacyBound[] a, int size, int newLength, int i)
+ {
+ if (i < 0 || i >= size)
+ return Arrays.copyOf(a, newLength);
+
+ LegacyBound[] newA = new LegacyBound[newLength];
+ System.arraycopy(a, 0, newA, 0, i);
+ System.arraycopy(a, i, newA, i+1, size - i);
+ return newA;
+ }
+
+ private static long[] grow(long[] a, int size, int newLength, int i)
+ {
+ if (i < 0 || i >= size)
+ return Arrays.copyOf(a, newLength);
+
+ long[] newA = new long[newLength];
+ System.arraycopy(a, 0, newA, 0, i);
+ System.arraycopy(a, i, newA, i+1, size - i);
+ return newA;
+ }
+
+ private static int[] grow(int[] a, int size, int newLength, int i)
+ {
+ if (i < 0 || i >= size)
+ return Arrays.copyOf(a, newLength);
+
+ int[] newA = new int[newLength];
+ System.arraycopy(a, 0, newA, 0, i);
+ System.arraycopy(a, i, newA, i+1, size - i);
+ return newA;
+ }
+
+ /*
+ * Move elements so that index i is "free", assuming the arrays have at least one free slot at the end.
+ */
+ private void moveElements(int i)
+ {
+ if (i >= size)
+ return;
+
+ System.arraycopy(starts, i, starts, i+1, size - i);
+ System.arraycopy(ends, i, ends, i+1, size - i);
+ System.arraycopy(markedAts, i, markedAts, i+1, size - i);
+ System.arraycopy(delTimes, i, delTimes, i+1, size - i);
+ // we set starts[i] to null to indicate the position is now empty, so that we update boundaryHeapSize
+ // when we set it
+ starts[i] = null;
+ }
+
+ private void setInternal(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime)
+ {
+ starts[i] = start;
+ ends[i] = end;
+ markedAts[i] = markedAt;
+ delTimes[i] = delTime;
+ }
+
+ public void serialize(DataOutputPlus out, CFMetaData metadata) throws IOException
+ {
+ out.writeInt(size);
+ if (size == 0)
+ return;
+
+ List<AbstractType<?>> types = new ArrayList<>(comparator.clusteringComparator.subtypes());
+ if (!metadata.isDense())
+ types.add(UTF8Type.instance);
+ CompositeType type = CompositeType.getInstance(types);
+
+ for (int i = 0; i < size; i++)
+ {
+ LegacyBound start = starts[i];
+ LegacyBound end = ends[i];
+
+ CompositeType.Builder startBuilder = type.builder();
+ CompositeType.Builder endBuilder = type.builder();
+ for (int j = 0; j < start.bound.clustering().size(); j++)
+ {
+ startBuilder.add(start.bound.get(j));
+ endBuilder.add(end.bound.get(j));
+ }
+
+ if (start.collectionName != null)
+ startBuilder.add(start.collectionName.name.bytes);
+ if (end.collectionName != null)
+ endBuilder.add(end.collectionName.name.bytes);
+
+ ByteBufferUtil.writeWithShortLength(startBuilder.build(), out);
+ ByteBufferUtil.writeWithShortLength(endBuilder.buildAsEndOfRange(), out);
+
+ out.writeInt(delTimes[i]);
+ out.writeLong(markedAts[i]);
+ }
+ }
+
+ public long serializedSize(CFMetaData metadata)
+ {
+ long size = 0;
+ size += TypeSizes.sizeof(this.size);
+
+ if (this.size == 0)
+ return size;
+
+ List<AbstractType<?>> types = new ArrayList<>(comparator.clusteringComparator.subtypes());
+ if (!metadata.isDense())
+ types.add(UTF8Type.instance);
+ CompositeType type = CompositeType.getInstance(types);
+
+ for (int i = 0; i < this.size; i++)
+ {
+ LegacyBound start = starts[i];
+ LegacyBound end = ends[i];
+
+ CompositeType.Builder startBuilder = type.builder();
+ CompositeType.Builder endBuilder = type.builder();
+ for (int j = 0; j < start.bound.clustering().size(); j++)
+ {
+ startBuilder.add(start.bound.get(j));
+ endBuilder.add(end.bound.get(j));
+ }
+
+ if (start.collectionName != null)
+ startBuilder.add(start.collectionName.name.bytes);
+ if (end.collectionName != null)
+ endBuilder.add(end.collectionName.name.bytes);
+
+ size += ByteBufferUtil.serializedSizeWithShortLength(startBuilder.build());
+ size += ByteBufferUtil.serializedSizeWithShortLength(endBuilder.buildAsEndOfRange());
+
+ size += TypeSizes.sizeof(delTimes[i]);
+ size += TypeSizes.sizeof(markedAts[i]);
}
+ return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
index 5f1da8a..aa60198 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -91,6 +91,12 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
return Iterators.concat(statics.selectOrderIterator(), regulars.selectOrderIterator());
}
+ /** * Returns the total number of static and regular columns. */
+ public int size()
+ {
+ return regulars.columnCount() + statics.columnCount();
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 18b6950..2219a84 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.thrift.ThriftResultsMerger;
@@ -226,6 +228,15 @@ public class PartitionRangeReadCommand extends ReadCommand
};
}
+ @SuppressWarnings("deprecation")
+ protected MessageOut<ReadCommand> createLegacyMessage()
+ {
+ if (this.dataRange.isPaging())
+ return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer);
+ else
+ return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
+ }
+
protected void appendCQLWhereClause(StringBuilder sb)
{
if (dataRange.isUnrestricted() && rowFilter().isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
new file mode 100644
index 0000000..3f1d660
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+public class RangeSliceVerbHandler extends ReadCommandVerbHandler
+{
+ @Override
+ protected IVersionedSerializer<ReadResponse> serializer()
+ {
+ return ReadResponse.legacyRangeSliceReplySerializer;
+ }
+}