You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2018/02/11 13:26:06 UTC

[21/29] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/LegacyLayout.java
index 2117dd6,0000000..912d591
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@@ -1,2504 -1,0 +1,2509 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.DataInput;
 +import java.io.IOException;
 +import java.io.IOError;
 +import java.nio.ByteBuffer;
 +import java.security.MessageDigest;
 +import java.util.*;
 +
 +import org.apache.cassandra.cql3.SuperColumnCompatibility;
++import org.apache.cassandra.thrift.Column;
 +import org.apache.cassandra.utils.AbstractIterator;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.PeekingIterator;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.context.CounterContext;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.utils.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +
 +/**
 + * Functions to deal with the old format.
 + */
 +public abstract class LegacyLayout
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LegacyLayout.class);
 +
 +    public final static int MAX_CELL_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
 +
 +    public final static int STATIC_PREFIX = 0xFFFF;
 +
 +    public final static int DELETION_MASK        = 0x01;
 +    public final static int EXPIRATION_MASK      = 0x02;
 +    public final static int COUNTER_MASK         = 0x04;
 +    public final static int COUNTER_UPDATE_MASK  = 0x08;
 +    private final static int RANGE_TOMBSTONE_MASK = 0x10;
 +
 +    private LegacyLayout() {}
 +
 +    public static AbstractType<?> makeLegacyComparator(CFMetaData metadata)
 +    {
 +        ClusteringComparator comparator = metadata.comparator;
 +        if (!metadata.isCompound())
 +        {
 +            assert comparator.size() == 1;
 +            return comparator.subtype(0);
 +        }
 +
 +        boolean hasCollections = metadata.hasCollectionColumns() || metadata.hasDroppedCollectionColumns();
 +        List<AbstractType<?>> types = new ArrayList<>(comparator.size() + (metadata.isDense() ? 0 : 1) + (hasCollections ? 1 : 0));
 +
 +        types.addAll(comparator.subtypes());
 +
 +        if (!metadata.isDense())
 +        {
 +            types.add(UTF8Type.instance);
 +
 +            if (hasCollections)
 +            {
 +                Map<ByteBuffer, CollectionType> defined = new HashMap<>();
 +
 +                for (CFMetaData.DroppedColumn def : metadata.getDroppedColumns().values())
 +                    if (def.type instanceof CollectionType && def.type.isMultiCell())
 +                        defined.put(bytes(def.name), (CollectionType) def.type);
 +
 +                for (ColumnDefinition def : metadata.partitionColumns())
 +                    if (def.type instanceof CollectionType && def.type.isMultiCell())
 +                        defined.put(def.name.bytes, (CollectionType) def.type);
 +
 +                types.add(ColumnToCollectionType.getInstance(defined));
 +            }
 +        }
 +        return CompositeType.getInstance(types);
 +    }
 +
 +    public static LegacyCellName decodeCellName(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer cellname)
 +    throws UnknownColumnException
 +    {
 +        assert cellname != null;
 +        if (metadata.isSuper())
 +        {
 +            assert superColumnName != null;
 +            return decodeForSuperColumn(metadata, new Clustering(superColumnName), cellname);
 +        }
 +
 +        assert superColumnName == null;
 +        return decodeCellName(metadata, cellname);
 +    }
 +
 +    private static LegacyCellName decodeForSuperColumn(CFMetaData metadata, Clustering clustering, ByteBuffer subcol)
 +    {
 +        ColumnDefinition def = metadata.getColumnDefinition(subcol);
 +        if (def != null)
 +        {
 +            // it's a statically defined subcolumn
 +            return new LegacyCellName(clustering, def, null);
 +        }
 +
 +        def = metadata.compactValueColumn();
 +        assert def != null && def.type instanceof MapType;
 +        return new LegacyCellName(clustering, def, subcol);
 +    }
 +
 +    public static LegacyCellName decodeCellName(CFMetaData metadata, ByteBuffer cellname) throws UnknownColumnException
 +    {
 +        return decodeCellName(metadata, cellname, false);
 +    }
 +
 +    public static LegacyCellName decodeCellName(CFMetaData metadata, ByteBuffer cellname, boolean readAllAsDynamic) throws UnknownColumnException
 +    {
 +        Clustering clustering = decodeClustering(metadata, cellname);
 +
 +        if (metadata.isSuper())
 +            return decodeForSuperColumn(metadata, clustering, CompositeType.extractComponent(cellname, 1));
 +
 +        if (metadata.isDense() || (metadata.isCompactTable() && readAllAsDynamic))
 +            return new LegacyCellName(clustering, metadata.compactValueColumn(), null);
 +
 +        ByteBuffer column = metadata.isCompound() ? CompositeType.extractComponent(cellname, metadata.comparator.size()) : cellname;
 +        if (column == null)
 +        {
 +            // Tables for composite 2ndary indexes used to be compound but dense, but we've transformed them into regular tables
 +            // (non compact ones) but with no regular column (i.e. we only care about the clustering). So we'll get here
 +            // in that case, and what we want to return is basically a row marker.
 +            if (metadata.partitionColumns().isEmpty())
 +                return new LegacyCellName(clustering, null, null);
 +
 +            // Otherwise, we shouldn't get there
 +            throw new IllegalArgumentException("No column name component found in cell name");
 +        }
 +
 +        // Row marker, this is ok
 +        if (!column.hasRemaining())
 +            return new LegacyCellName(clustering, null, null);
 +
 +        ColumnDefinition def = metadata.getColumnDefinition(column);
 +        if ((def == null) || def.isPrimaryKeyColumn())
 +        {
 +            // If it's a compact table, it means the column is in fact a "dynamic" one
 +            if (metadata.isCompactTable())
 +                return new LegacyCellName(new Clustering(column), metadata.compactValueColumn(), null);
 +
 +            if (def == null)
 +                throw new UnknownColumnException(metadata, column);
 +            else
 +                throw new IllegalArgumentException("Cannot add primary key column to partition update");
 +        }
 +
 +        ByteBuffer collectionElement = metadata.isCompound() ? CompositeType.extractComponent(cellname, metadata.comparator.size() + 1) : null;
 +
 +        // Note that because static compact columns are translated to static defs in the new world order, we need to force a static
 +        // clustering if the definition is static (as it might not be in this case).
 +        return new LegacyCellName(def.isStatic() ? Clustering.STATIC_CLUSTERING : clustering, def, collectionElement);
 +    }
 +
 +    public static LegacyBound decodeBound(CFMetaData metadata, ByteBuffer bound, boolean isStart)
 +    {
 +        if (!bound.hasRemaining())
 +            return isStart ? LegacyBound.BOTTOM : LegacyBound.TOP;
 +
 +        if (!metadata.isCompound())
 +        {
 +            // The non compound case is a lot easier, in that there is no EOC nor collection to worry about, so dealing
 +            // with that first.
 +            return new LegacyBound(isStart ? Slice.Bound.inclusiveStartOf(bound) : Slice.Bound.inclusiveEndOf(bound), false, null);
 +        }
 +
 +        int clusteringSize = metadata.comparator.size();
 +
 +        List<ByteBuffer> components = CompositeType.splitName(bound);
 +        byte eoc = CompositeType.lastEOC(bound);
 +
 +        // There can be  more components than the clustering size only in the case this is the bound of a collection
 +        // range tombstone. In which case, there is exactly one more component, and that component is the name of the
 +        // collection being selected/deleted.
 +        assert components.size() <= clusteringSize || (!metadata.isCompactTable() && components.size() == clusteringSize + 1);
 +
 +        ColumnDefinition collectionName = null;
 +        if (components.size() > clusteringSize)
 +            collectionName = metadata.getColumnDefinition(components.remove(clusteringSize));
 +
 +        boolean isInclusive;
 +        if (isStart)
 +        {
 +            isInclusive = eoc <= 0;
 +        }
 +        else
 +        {
 +            isInclusive = eoc >= 0;
 +
 +            // for an end bound, if we only have a prefix of all the components and the final EOC is zero,
 +            // then it should only match up to the prefix but no further, that is, it is an inclusive bound
 +            // of the exact prefix but an exclusive bound of anything beyond it, so adding an empty
 +            // composite value ensures this behavior, see CASSANDRA-12423 for more details
 +            if (eoc == 0 && components.size() < clusteringSize)
 +            {
 +                components.add(ByteBufferUtil.EMPTY_BYTE_BUFFER);
 +                isInclusive = false;
 +            }
 +        }
 +
 +        Slice.Bound.Kind boundKind = Slice.Bound.boundKind(isStart, isInclusive);
 +        Slice.Bound sb = Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
 +        return new LegacyBound(sb, metadata.isCompound() && CompositeType.isStaticName(bound), collectionName);
 +    }
 +
 +    public static ByteBuffer encodeBound(CFMetaData metadata, Slice.Bound bound, boolean isStart)
 +    {
 +        if (bound == Slice.Bound.BOTTOM || bound == Slice.Bound.TOP || metadata.comparator.size() == 0)
 +            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +
 +        ClusteringPrefix clustering = bound.clustering();
 +
 +        if (!metadata.isCompound())
 +        {
 +            assert clustering.size() == 1;
 +            return clustering.get(0);
 +        }
 +
 +        CompositeType ctype = CompositeType.getInstance(metadata.comparator.subtypes());
 +        CompositeType.Builder builder = ctype.builder();
 +        for (int i = 0; i < clustering.size(); i++)
 +            builder.add(clustering.get(i));
 +
 +        if (isStart)
 +            return bound.isInclusive() ? builder.build() : builder.buildAsEndOfRange();
 +        else
 +            return bound.isInclusive() ? builder.buildAsEndOfRange() : builder.build();
 +    }
 +
 +    public static ByteBuffer encodeCellName(CFMetaData metadata, ClusteringPrefix clustering, ByteBuffer columnName, ByteBuffer collectionElement)
 +    {
 +        boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
 +
 +        if (!metadata.isCompound())
 +        {
 +            if (isStatic)
 +                return columnName;
 +
 +            assert clustering.size() == 1 : "Expected clustering size to be 1, but was " + clustering.size();
 +            return clustering.get(0);
 +        }
 +
 +        // We use comparator.size() rather than clustering.size() because of static clusterings
 +        int clusteringSize = metadata.comparator.size();
 +        int size = clusteringSize + (metadata.isDense() ? 0 : 1) + (collectionElement == null ? 0 : 1);
 +        if (metadata.isSuper())
 +            size = clusteringSize + 1;
 +        ByteBuffer[] values = new ByteBuffer[size];
 +        for (int i = 0; i < clusteringSize; i++)
 +        {
 +            if (isStatic)
 +            {
 +                values[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +                continue;
 +            }
 +
 +            ByteBuffer v = clustering.get(i);
 +            // we can have null (only for dense compound tables for backward compatibility reasons) but that
 +            // means we're done and should stop there as far as building the composite is concerned.
 +            if (v == null)
 +                return CompositeType.build(Arrays.copyOfRange(values, 0, i));
 +
 +            values[i] = v;
 +        }
 +
 +        if (metadata.isSuper())
 +        {
 +            // We need to set the "column" (in thrift terms) name, i.e. the value corresponding to the subcomparator.
 +            // What it is depends if this a cell for a declared "static" column or a "dynamic" column part of the
 +            // super-column internal map.
 +            assert columnName != null; // This should never be null for supercolumns, see decodeForSuperColumn() above
 +            values[clusteringSize] = columnName.equals(SuperColumnCompatibility.SUPER_COLUMN_MAP_COLUMN)
 +                                   ? collectionElement
 +                                   : columnName;
 +        }
 +        else
 +        {
 +            if (!metadata.isDense())
 +                values[clusteringSize] = columnName;
 +            if (collectionElement != null)
 +                values[clusteringSize + 1] = collectionElement;
 +        }
 +
 +        return CompositeType.build(isStatic, values);
 +    }
 +
 +    public static Clustering decodeClustering(CFMetaData metadata, ByteBuffer value)
 +    {
 +        int csize = metadata.comparator.size();
 +        if (csize == 0)
 +            return Clustering.EMPTY;
 +
 +        if (metadata.isCompound() && CompositeType.isStaticName(value))
 +            return Clustering.STATIC_CLUSTERING;
 +
 +        List<ByteBuffer> components = metadata.isCompound()
 +                                    ? CompositeType.splitName(value)
 +                                    : Collections.singletonList(value);
 +
 +        return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
 +    }
 +
 +    public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering)
 +    {
 +        if (clustering.size() == 0)
 +            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +
 +        if (!metadata.isCompound())
 +        {
 +            assert clustering.size() == 1;
 +            return clustering.get(0);
 +        }
 +
 +        ByteBuffer[] values = new ByteBuffer[clustering.size()];
 +        for (int i = 0; i < clustering.size(); i++)
 +            values[i] = clustering.get(i);
 +        return CompositeType.build(values);
 +    }
 +
 +    /**
 +     * The maximum number of cells to include per partition when converting to the old format.
 +     * <p>
 +     * We already apply the limit during the actual query, but for queries that counts cells and not rows (thrift queries
 +     * and distinct queries as far as old nodes are concerned), we may still include a little bit more than requested
 +     * because {@link DataLimits} always include full rows. So if the limit ends in the middle of a queried row, the
 +     * full row will be part of our result. This would confuse old nodes however so we make sure to truncate it to
 +     * what's expected before writting it on the wire.
 +     *
 +     * @param command the read commmand for which to determine the maximum cells per partition. This can be {@code null}
 +     * in which case {@code Integer.MAX_VALUE} is returned.
 +     * @return the maximum number of cells per partition that should be enforced according to the read command if
 +     * post-query limitation are in order (see above). This will be {@code Integer.MAX_VALUE} if no such limits are
 +     * necessary.
 +     */
 +    private static int maxCellsPerPartition(ReadCommand command)
 +    {
 +        if (command == null)
 +            return Integer.MAX_VALUE;
 +
 +        DataLimits limits = command.limits();
 +
 +        // There is 2 types of DISTINCT queries: those that includes only the partition key, and those that include static columns.
 +        // On old nodes, the latter expects the first row in term of CQL count, which is what we already have and there is no additional
 +        // limit to apply. The former however expect only one cell per partition and rely on it (See CASSANDRA-10762).
 +        if (limits.isDistinct())
 +            return command.columnFilter().fetchedColumns().statics.isEmpty() ? 1 : Integer.MAX_VALUE;
 +
 +        switch (limits.kind())
 +        {
 +            case THRIFT_LIMIT:
 +            case SUPER_COLUMN_COUNTING_LIMIT:
 +                return limits.perPartitionCount();
 +            default:
 +                return Integer.MAX_VALUE;
 +        }
 +    }
 +
 +    // For serializing to old wire format
 +    public static LegacyUnfilteredPartition fromUnfilteredRowIterator(ReadCommand command, UnfilteredRowIterator iterator)
 +    {
 +        // we need to extract the range tombstone so materialize the partition. Since this is
 +        // used for the on-wire format, this is not worst than it used to be.
 +        final ImmutableBTreePartition partition = ImmutableBTreePartition.create(iterator);
 +        DeletionInfo info = partition.deletionInfo();
 +        Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> pair = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow());
 +
 +        LegacyLayout.LegacyRangeTombstoneList rtl = pair.left;
 +
 +        // Processing the cell iterator results in the LegacyRangeTombstoneList being populated, so we do this
 +        // before we use the LegacyRangeTombstoneList at all
 +        List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right);
 +
 +        int maxCellsPerPartition = maxCellsPerPartition(command);
 +        if (cells.size() > maxCellsPerPartition)
 +            cells = cells.subList(0, maxCellsPerPartition);
 +
 +        // The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex
 +        // deletions.  Go through our normal range tombstones and add then to the LegacyRTL so that the range
 +        // tombstones all get merged and sorted properly.
 +        if (info.hasRanges())
 +        {
 +            Iterator<RangeTombstone> rangeTombstoneIterator = info.rangeIterator(false);
 +            while (rangeTombstoneIterator.hasNext())
 +            {
 +                RangeTombstone rt = rangeTombstoneIterator.next();
 +                Slice slice = rt.deletedSlice();
 +                LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(slice.start(), false, null);
 +                LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(slice.end(), false, null);
 +                rtl.add(start, end, rt.deletionTime().markedForDeleteAt(), rt.deletionTime().localDeletionTime());
 +            }
 +        }
 +
 +        return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells);
 +    }
 +
 +    public static void serializeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
 +    {
 +        assert version < MessagingService.VERSION_30;
 +
 +        out.writeBoolean(true);
 +
 +        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition);
 +
 +        UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version);
 +        DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out);
 +
 +        legacyPartition.rangeTombstones.serialize(out, partition.metadata());
 +
 +        // begin cell serialization
 +        out.writeInt(legacyPartition.cells.size());
 +        for (LegacyLayout.LegacyCell cell : legacyPartition.cells)
 +        {
 +            ByteBufferUtil.writeWithShortLength(cell.name.encode(partition.metadata()), out);
 +            out.writeByte(cell.serializationFlags());
 +            if (cell.isExpiring())
 +            {
 +                out.writeInt(cell.ttl);
 +                out.writeInt(cell.localDeletionTime);
 +            }
 +            else if (cell.isTombstone())
 +            {
 +                out.writeLong(cell.timestamp);
 +                out.writeInt(TypeSizes.sizeof(cell.localDeletionTime));
 +                out.writeInt(cell.localDeletionTime);
 +                continue;
 +            }
 +            else if (cell.isCounterUpdate())
 +            {
 +                out.writeLong(cell.timestamp);
 +                long count = CounterContext.instance().getLocalCount(cell.value);
 +                ByteBufferUtil.writeWithLength(ByteBufferUtil.bytes(count), out);
 +                continue;
 +            }
 +            else if (cell.isCounter())
 +            {
 +                out.writeLong(Long.MIN_VALUE);  // timestampOfLastDelete (not used, and MIN_VALUE is the default)
 +            }
 +
 +            out.writeLong(cell.timestamp);
 +            ByteBufferUtil.writeWithLength(cell.value, out);
 +        }
 +    }
 +
 +    // For the old wire format
 +    // Note: this can return null if an empty partition is serialized!
 +    public static UnfilteredRowIterator deserializeLegacyPartition(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
 +    {
 +        assert version < MessagingService.VERSION_30;
 +
 +        // This is only used in mutation, and mutation have never allowed "null" column families
 +        boolean present = in.readBoolean();
 +        if (!present)
 +            return null;
 +
 +        CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
 +        LegacyDeletionInfo info = LegacyDeletionInfo.deserialize(metadata, in);
 +        int size = in.readInt();
 +        Iterator<LegacyCell> cells = deserializeCells(metadata, in, flag, size);
 +        SerializationHelper helper = new SerializationHelper(metadata, version, flag);
 +        return onWireCellstoUnfilteredRowIterator(metadata, metadata.partitioner.decorateKey(key), info, cells, false, helper);
 +    }
 +
 +    // For the old wire format
 +    public static long serializedSizeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, int version)
 +    {
 +        assert version < MessagingService.VERSION_30;
 +
 +        if (partition.isEmpty())
 +            return TypeSizes.sizeof(false);
 +
 +        long size = TypeSizes.sizeof(true);
 +
 +        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition);
 +
 +        size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version);
 +        size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion);
 +        size += legacyPartition.rangeTombstones.serializedSize(partition.metadata());
 +
 +        // begin cell serialization
 +        size += TypeSizes.sizeof(legacyPartition.cells.size());
 +        for (LegacyLayout.LegacyCell cell : legacyPartition.cells)
 +        {
 +            size += ByteBufferUtil.serializedSizeWithShortLength(cell.name.encode(partition.metadata()));
 +            size += 1;  // serialization flags
 +            if (cell.isExpiring())
 +            {
 +                size += TypeSizes.sizeof(cell.ttl);
 +                size += TypeSizes.sizeof(cell.localDeletionTime);
 +            }
 +            else if (cell.isTombstone())
 +            {
 +                size += TypeSizes.sizeof(cell.timestamp);
 +                // localDeletionTime replaces cell.value as the body
 +                size += TypeSizes.sizeof(TypeSizes.sizeof(cell.localDeletionTime));
 +                size += TypeSizes.sizeof(cell.localDeletionTime);
 +                continue;
 +            }
 +            else if (cell.isCounterUpdate())
 +            {
 +                size += TypeSizes.sizeof(cell.timestamp);
 +                long count = CounterContext.instance().getLocalCount(cell.value);
 +                size += ByteBufferUtil.serializedSizeWithLength(ByteBufferUtil.bytes(count));
 +                continue;
 +            }
 +            else if (cell.isCounter())
 +            {
 +                size += TypeSizes.sizeof(Long.MIN_VALUE);  // timestampOfLastDelete
 +            }
 +
 +            size += TypeSizes.sizeof(cell.timestamp);
 +            size += ByteBufferUtil.serializedSizeWithLength(cell.value);
 +        }
 +
 +        return size;
 +    }
 +
 +    // For thrift sake
 +    public static UnfilteredRowIterator toUnfilteredRowIterator(CFMetaData metadata,
 +                                                                DecoratedKey key,
 +                                                                LegacyDeletionInfo delInfo,
 +                                                                Iterator<LegacyCell> cells)
 +    {
 +        SerializationHelper helper = new SerializationHelper(metadata, 0, SerializationHelper.Flag.LOCAL);
 +        return toUnfilteredRowIterator(metadata, key, delInfo, cells, false, helper);
 +    }
 +
 +    // For deserializing old wire format
 +    public static UnfilteredRowIterator onWireCellstoUnfilteredRowIterator(CFMetaData metadata,
 +                                                                           DecoratedKey key,
 +                                                                           LegacyDeletionInfo delInfo,
 +                                                                           Iterator<LegacyCell> cells,
 +                                                                           boolean reversed,
 +                                                                           SerializationHelper helper)
 +    {
 +
 +        // If the table is a static compact, the "column_metadata" are now internally encoded as
 +        // static. This has already been recognized by decodeCellName, but it means the cells
 +        // provided are not in the expected order (the "static" cells are not necessarily at the front).
 +        // So sort them to make sure toUnfilteredRowIterator works as expected.
 +        // Further, if the query is reversed, then the on-wire format still has cells in non-reversed
 +        // order, but we need to have them reverse in the final UnfilteredRowIterator. So reverse them.
 +        if (metadata.isStaticCompactTable() || reversed)
 +        {
 +            List<LegacyCell> l = new ArrayList<>();
 +            Iterators.addAll(l, cells);
 +            Collections.sort(l, legacyCellComparator(metadata, reversed));
 +            cells = l.iterator();
 +        }
 +
 +        return toUnfilteredRowIterator(metadata, key, delInfo, cells, reversed, helper);
 +    }
 +
 +    private static UnfilteredRowIterator toUnfilteredRowIterator(CFMetaData metadata,
 +                                                                 DecoratedKey key,
 +                                                                 LegacyDeletionInfo delInfo,
 +                                                                 Iterator<LegacyCell> cells,
 +                                                                 boolean reversed,
 +                                                                 SerializationHelper helper)
 +    {
 +        // A reducer that basically does nothing, we know the 2 merged iterators can't have conflicting atoms (since we merge cells with range tombstones).
 +        MergeIterator.Reducer<LegacyAtom, LegacyAtom> reducer = new MergeIterator.Reducer<LegacyAtom, LegacyAtom>()
 +        {
 +            private LegacyAtom atom;
 +
 +            public void reduce(int idx, LegacyAtom current)
 +            {
 +                // We're merging cell with range tombstones, so we should always only have a single atom to reduce.
 +                assert atom == null;
 +                atom = current;
 +            }
 +
 +            protected LegacyAtom getReduced()
 +            {
 +                return atom;
 +            }
 +
 +            protected void onKeyChange()
 +            {
 +                atom = null;
 +            }
 +        };
 +        List<Iterator<LegacyAtom>> iterators = Arrays.asList(asLegacyAtomIterator(cells), asLegacyAtomIterator(delInfo.inRowRangeTombstones()));
 +        PeekingIterator<LegacyAtom> atoms = Iterators.peekingIterator(MergeIterator.get(iterators, legacyAtomComparator(metadata), reducer));
 +
 +        // Check if we have some static
 +        Row staticRow = atoms.hasNext() && atoms.peek().isStatic()
 +                      ? getNextRow(CellGrouper.staticGrouper(metadata, helper), atoms)
 +                      : Rows.EMPTY_STATIC_ROW;
 +
 +        Iterator<Row> rows = convertToRows(new CellGrouper(metadata, helper), atoms);
 +        Iterator<RangeTombstone> ranges = delInfo.deletionInfo.rangeIterator(reversed);
 +        return new RowAndDeletionMergeIterator(metadata,
 +                                               key,
 +                                               delInfo.deletionInfo.getPartitionDeletion(),
 +                                               ColumnFilter.all(metadata),
 +                                               staticRow,
 +                                               reversed,
 +                                               EncodingStats.NO_STATS,
 +                                               rows,
 +                                               ranges,
 +                                               true);
 +    }
 +
 +    public static Row extractStaticColumns(CFMetaData metadata, DataInputPlus in, Columns statics) throws IOException
 +    {
 +        assert !statics.isEmpty();
 +        assert metadata.isCompactTable();
 +
 +        if (metadata.isSuper())
 +            // TODO: there is in practice nothing to do here, but we need to handle the column_metadata for super columns somewhere else
 +            throw new UnsupportedOperationException();
 +
 +        Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.size());
 +        for (ColumnDefinition column : statics)
 +            columnsToFetch.add(column.name.bytes);
 +
 +        Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
 +        builder.newRow(Clustering.STATIC_CLUSTERING);
 +
 +        boolean foundOne = false;
 +        LegacyAtom atom;
 +        while ((atom = readLegacyAtomSkippingUnknownColumn(metadata,in)) != null)
 +        {
 +            if (atom.isCell())
 +            {
 +                LegacyCell cell = atom.asCell();
 +                if (!columnsToFetch.contains(cell.name.encode(metadata)))
 +                    continue;
 +
 +                foundOne = true;
 +                builder.addCell(new BufferCell(cell.name.column, cell.timestamp, cell.ttl, cell.localDeletionTime, cell.value, null));
 +            }
 +            else
 +            {
 +                LegacyRangeTombstone tombstone = atom.asRangeTombstone();
 +                // TODO: we need to track tombstones and potentially ignore cells that are
 +                // shadowed (or even better, replace them by tombstones).
 +                throw new UnsupportedOperationException();
 +            }
 +        }
 +
 +        return foundOne ? builder.build() : Rows.EMPTY_STATIC_ROW;
 +    }
 +
 +    private static LegacyAtom readLegacyAtomSkippingUnknownColumn(CFMetaData metadata, DataInputPlus in)
 +    throws IOException
 +    {
 +        while (true)
 +        {
 +            try
 +            {
 +                return readLegacyAtom(metadata, in, false);
 +            }
 +            catch (UnknownColumnException e)
 +            {
 +                // Simply skip, as the method name implies.
 +            }
 +        }
 +
 +    }
 +
 +    private static Row getNextRow(CellGrouper grouper, PeekingIterator<? extends LegacyAtom> cells)
 +    {
 +        if (!cells.hasNext())
 +            return null;
 +
 +        grouper.reset();
 +        while (cells.hasNext() && grouper.addAtom(cells.peek()))
 +        {
 +            // We've added the cell already in the grouper, so just skip it
 +            cells.next();
 +        }
 +        return grouper.getRow();
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    private static Iterator<LegacyAtom> asLegacyAtomIterator(Iterator<? extends LegacyAtom> iter)
 +    {
 +        return (Iterator<LegacyAtom>)iter;
 +    }
 +
 +    private static Iterator<Row> convertToRows(final CellGrouper grouper, final PeekingIterator<LegacyAtom> atoms)
 +    {
 +        return new AbstractIterator<Row>()
 +        {
 +            protected Row computeNext()
 +            {
 +                if (!atoms.hasNext())
 +                    return endOfData();
 +
 +                return getNextRow(grouper, atoms);
 +            }
 +        };
 +    }
 +
 +    public static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final RowIterator iterator)
 +    {
 +        return fromRowIterator(iterator.metadata(), iterator, iterator.staticRow());
 +    }
 +
 +    private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow)
 +    {
 +        LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10);
 +        Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
 +        {
 +            private Iterator<LegacyCell> currentRow = initializeRow();
 +
 +            private Iterator<LegacyCell> initializeRow()
 +            {
 +                if (staticRow == null || staticRow.isEmpty())
 +                    return Collections.<LegacyLayout.LegacyCell>emptyIterator();
 +
 +                Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, staticRow);
 +                deletions.addAll(row.left);
 +                return row.right;
 +            }
 +
 +            protected LegacyCell computeNext()
 +            {
 +                while (true)
 +                {
 +                    if (currentRow.hasNext())
 +                        return currentRow.next();
 +
 +                    if (!iterator.hasNext())
 +                        return endOfData();
 +
 +                    Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, iterator.next());
 +                    deletions.addAll(row.left);
 +                    currentRow = row.right;
 +                }
 +            }
 +        };
 +
 +        return Pair.create(deletions, cells);
 +    }
 +
 +    private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRow(final CFMetaData metadata, final Row row)
 +    {
 +        // convert any complex deletions or row deletion into normal range tombstones so that we can build and send a proper RangeTombstoneList
 +        // to legacy nodes
 +        LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10);
 +
 +        if (!row.deletion().isLive())
 +        {
 +            Clustering clustering = row.clustering();
 +            Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering);
 +            Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering);
 +
 +            LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, null);
 +            LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, null);
 +
 +            deletions.add(start, end, row.deletion().time().markedForDeleteAt(), row.deletion().time().localDeletionTime());
 +        }
 +
 +        for (ColumnData cd : row)
 +        {
 +            ColumnDefinition col = cd.column();
 +            if (col.isSimple())
 +                continue;
 +
 +            DeletionTime delTime = ((ComplexColumnData)cd).complexDeletion();
 +            if (!delTime.isLive())
 +            {
 +                Clustering clustering = row.clustering();
 +
 +                Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering);
 +                Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering);
 +
 +                LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, col.isStatic(), col);
 +                LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, col.isStatic(), col);
 +
 +                deletions.add(start, end, delTime.markedForDeleteAt(), delTime.localDeletionTime());
 +            }
 +        }
 +
 +        Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
 +        {
 +            private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata, false).iterator();
 +            // we don't have (and shouldn't have) row markers for compact tables.
 +            private boolean hasReturnedRowMarker = metadata.isCompactTable();
 +
 +            protected LegacyCell computeNext()
 +            {
 +                if (!hasReturnedRowMarker)
 +                {
 +                    hasReturnedRowMarker = true;
 +
 +                    // don't include a row marker if there's no timestamp on the primary key; this is the 3.0+ equivalent
 +                    // of a row marker
 +                    if (!row.primaryKeyLivenessInfo().isEmpty())
 +                    {
 +                        LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null);
 +                        LivenessInfo info = row.primaryKeyLivenessInfo();
 +                        return new LegacyCell(info.isExpiring() ? LegacyCell.Kind.EXPIRING : LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl());
 +                    }
 +                }
 +
 +                if (!cells.hasNext())
 +                    return endOfData();
 +
 +                return makeLegacyCell(row.clustering(), cells.next());
 +            }
 +        };
 +        return Pair.create(deletions, cells);
 +    }
 +
 +    private static LegacyCell makeLegacyCell(Clustering clustering, Cell cell)
 +    {
 +        LegacyCell.Kind kind;
 +        if (cell.isCounterCell())
 +            kind = LegacyCell.Kind.COUNTER;
 +        else if (cell.isTombstone())
 +            kind = LegacyCell.Kind.DELETED;
 +        else if (cell.isExpiring())
 +            kind = LegacyCell.Kind.EXPIRING;
 +        else
 +            kind = LegacyCell.Kind.REGULAR;
 +
 +        CellPath path = cell.path();
 +        assert path == null || path.size() == 1;
 +        LegacyCellName name = new LegacyCellName(clustering, cell.column(), path == null ? null : path.get(0));
 +        return new LegacyCell(kind, name, cell.value(), cell.timestamp(), cell.localDeletionTime(), cell.ttl());
 +    }
 +
 +    public static RowIterator toRowIterator(final CFMetaData metadata,
 +                                            final DecoratedKey key,
 +                                            final Iterator<LegacyCell> cells,
 +                                            final int nowInSec)
 +    {
 +        SerializationHelper helper = new SerializationHelper(metadata, 0, SerializationHelper.Flag.LOCAL);
 +        return UnfilteredRowIterators.filter(toUnfilteredRowIterator(metadata, key, LegacyDeletionInfo.live(), cells, false, helper), nowInSec);
 +    }
 +
 +    public static Comparator<LegacyCell> legacyCellComparator(CFMetaData metadata)
 +    {
 +        return legacyCellComparator(metadata, false);
 +    }
 +
 +    public static Comparator<LegacyCell> legacyCellComparator(final CFMetaData metadata, final boolean reversed)
 +    {
 +        final Comparator<LegacyCellName> cellNameComparator = legacyCellNameComparator(metadata, reversed);
 +        return new Comparator<LegacyCell>()
 +        {
 +            public int compare(LegacyCell cell1, LegacyCell cell2)
 +            {
 +                LegacyCellName c1 = cell1.name;
 +                LegacyCellName c2 = cell2.name;
 +
 +                int c = cellNameComparator.compare(c1, c2);
 +                if (c != 0)
 +                    return c;
 +
 +                // The actual sorting when the cellname is equal doesn't matter, we just want to make
 +                // sure the cells are not considered equal.
 +                if (cell1.timestamp != cell2.timestamp)
 +                    return cell1.timestamp < cell2.timestamp ? -1 : 1;
 +
 +                if (cell1.localDeletionTime != cell2.localDeletionTime)
 +                    return cell1.localDeletionTime < cell2.localDeletionTime ? -1 : 1;
 +
 +                return cell1.value.compareTo(cell2.value);
 +            }
 +        };
 +    }
 +
 +    // Note that this doesn't exactly compare cells as they were pre-3.0 because within a row they sort columns like
 +    // in 3.0, that is, with simple columns before complex columns. In other words, this comparator makes sure cells
 +    // are in the proper order to convert them to actual 3.0 rows.
 +    public static Comparator<LegacyCellName> legacyCellNameComparator(final CFMetaData metadata, final boolean reversed)
 +    {
 +        return new Comparator<LegacyCellName>()
 +        {
 +            public int compare(LegacyCellName c1, LegacyCellName c2)
 +            {
 +                // Compare clustering first
 +                if (c1.clustering == Clustering.STATIC_CLUSTERING)
 +                {
 +                    if (c2.clustering != Clustering.STATIC_CLUSTERING)
 +                        return -1;
 +                }
 +                else if (c2.clustering == Clustering.STATIC_CLUSTERING)
 +                {
 +                    return 1;
 +                }
 +                else
 +                {
 +                    int c = metadata.comparator.compare(c1.clustering, c2.clustering);
 +                    if (c != 0)
 +                        return reversed ? -c : c;
 +                }
 +
 +                // Note that when reversed, we only care about the clustering being reversed, so it's ok
 +                // not to take reversed into account below.
 +
 +                // Then check the column name
 +                if (c1.column != c2.column)
 +                {
 +                    // A null for the column means it's a row marker
 +                    if (c1.column == null)
 +                        return -1;
 +                    if (c2.column == null)
 +                        return 1;
 +
 +                    assert c1.column.isRegular() || c1.column.isStatic();
 +                    assert c2.column.isRegular() || c2.column.isStatic();
 +                    int cmp = c1.column.compareTo(c2.column);
 +                    if (cmp != 0)
 +                        return cmp;
 +                }
 +
 +                assert (c1.collectionElement == null) == (c2.collectionElement == null);
 +
 +                if (c1.collectionElement != null)
 +                {
 +                    AbstractType<?> colCmp = ((CollectionType)c1.column.type).nameComparator();
 +                    return colCmp.compare(c1.collectionElement, c2.collectionElement);
 +                }
 +                return 0;
 +            }
 +        };
 +    }
 +
 +    private static boolean equalValues(ClusteringPrefix c1, ClusteringPrefix c2, ClusteringComparator comparator)
 +    {
 +        assert c1.size() == c2.size();
 +        for (int i = 0; i < c1.size(); i++)
 +        {
 +            if (comparator.compareComponent(i, c1.get(i), c2.get(i)) != 0)
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    private static Comparator<LegacyAtom> legacyAtomComparator(CFMetaData metadata)
 +    {
 +        return (o1, o2) ->
 +        {
 +            // First we want to compare by clustering, but we have to be careful with range tombstone, because
 +            // we can have collection deletion and we want those to sort properly just before the column they
 +            // delete, not before the whole row.
 +            // We also want to special case static so they sort before any non-static. Note in particular that
 +            // this special casing is important in the case of one of the Atom being Slice.Bound.BOTTOM: we want
 +            // it to sort after the static as we deal with static first in toUnfilteredAtomIterator and having
 +            // Slice.Bound.BOTTOM first would mess that up (note that static deletion is handled through a specific
 +            // static tombstone, see LegacyDeletionInfo.add()).
 +            if (o1.isStatic() != o2.isStatic())
 +                return o1.isStatic() ? -1 : 1;
 +
 +            ClusteringPrefix c1 = o1.clustering();
 +            ClusteringPrefix c2 = o2.clustering();
 +
 +            int clusteringComparison;
 +            if (c1.size() != c2.size() || (o1.isCell() == o2.isCell()) || !equalValues(c1, c2, metadata.comparator))
 +            {
 +                clusteringComparison = metadata.comparator.compare(c1, c2);
 +            }
 +            else
 +            {
 +                // one is a cell and one is a range tombstone, and both have the same prefix size (that is, the
 +                // range tombstone is either a row deletion or a collection deletion).
 +                LegacyRangeTombstone rt = o1.isCell() ? o2.asRangeTombstone() : o1.asRangeTombstone();
 +                clusteringComparison = rt.isCollectionTombstone()
 +                                       ? 0
 +                                       : metadata.comparator.compare(c1, c2);
 +            }
 +
 +            // Note that if both are range tombstones and have the same clustering, then they are equal.
 +            if (clusteringComparison != 0)
 +                return clusteringComparison;
 +
 +            if (o1.isCell())
 +            {
 +                LegacyCell cell1 = o1.asCell();
 +                if (o2.isCell())
 +                {
 +                    LegacyCell cell2 = o2.asCell();
 +                    // Check for row marker cells
 +                    if (cell1.name.column == null)
 +                        return cell2.name.column == null ? 0 : -1;
 +                    return cell2.name.column == null ? 1 : cell1.name.column.compareTo(cell2.name.column);
 +                }
 +
 +                LegacyRangeTombstone rt2 = o2.asRangeTombstone();
 +                assert rt2.isCollectionTombstone(); // otherwise, we shouldn't have got a clustering equality
 +                if (cell1.name.column == null)
 +                    return -1;
 +                int cmp = cell1.name.column.compareTo(rt2.start.collectionName);
 +                // If both are for the same column, then the RT should come first
 +                return cmp == 0 ? 1 : cmp;
 +            }
 +            else
 +            {
 +                assert o2.isCell();
 +                LegacyCell cell2 = o2.asCell();
 +
 +                LegacyRangeTombstone rt1 = o1.asRangeTombstone();
 +                assert rt1.isCollectionTombstone(); // otherwise, we shouldn't have got a clustering equality
 +
 +                if (cell2.name.column == null)
 +                    return 1;
 +
 +                int cmp = rt1.start.collectionName.compareTo(cell2.name.column);
 +                // If both are for the same column, then the RT should come first
 +                return cmp == 0 ? -1 : cmp;
 +            }
 +        };
 +    }
 +
 +    public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInputPlus in, boolean readAllAsDynamic)
 +    throws IOException, UnknownColumnException
 +    {
 +        ByteBuffer cellname = ByteBufferUtil.readWithShortLength(in);
 +        if (!cellname.hasRemaining())
 +            return null; // END_OF_ROW
 +
 +        try
 +        {
 +            int b = in.readUnsignedByte();
 +            return (b & RANGE_TOMBSTONE_MASK) != 0
 +                   ? readLegacyRangeTombstoneBody(metadata, in, cellname)
 +                   : readLegacyCellBody(metadata, in, cellname, b, SerializationHelper.Flag.LOCAL, readAllAsDynamic);
 +        }
 +        catch (UnknownColumnException e)
 +        {
 +            // We legitimately can get here in 2 cases:
 +            // 1) for system tables, because we've unceremoniously removed columns (without registering them as dropped)
 +            // 2) for dropped columns.
 +            // In any other case, there is a mismatch between the schema and the data, and we complain loudly in
 +            // that case. Note that if we are in a legit case of an unknown column, we want to simply skip that cell,
 +            // but we don't do this here and re-throw the exception because the calling code sometimes has to know
 +            // about this happening. This does mean code calling this method should handle this case properly.
 +            if (!metadata.ksName.equals(SystemKeyspace.NAME) && metadata.getDroppedColumnDefinition(e.columnName) == null)
 +                throw new IllegalStateException(String.format("Got cell for unknown column %s in sstable of %s.%s: " +
 +                                                              "This suggest a problem with the schema which doesn't list " +
 +                                                              "this column. Even if that column was dropped, it should have " +
 +                                                              "been listed as such", metadata.ksName, metadata.cfName, UTF8Type.instance.compose(e.columnName)), e);
 +
 +            throw e;
 +        }
 +    }
 +
 +    public static LegacyCell readLegacyCell(CFMetaData metadata, DataInput in, SerializationHelper.Flag flag) throws IOException, UnknownColumnException
 +    {
 +        ByteBuffer cellname = ByteBufferUtil.readWithShortLength(in);
 +        int b = in.readUnsignedByte();
 +        return readLegacyCellBody(metadata, in, cellname, b, flag, false);
 +    }
 +
 +    public static LegacyCell readLegacyCellBody(CFMetaData metadata, DataInput in, ByteBuffer cellname, int mask, SerializationHelper.Flag flag, boolean readAllAsDynamic)
 +    throws IOException, UnknownColumnException
 +    {
 +        // Note that we want to call decodeCellName only after we've deserialized other parts, since it can throw
 +        // and we want to throw only after having deserialized the full cell.
 +        if ((mask & COUNTER_MASK) != 0)
 +        {
 +            in.readLong(); // timestampOfLastDelete: this has been unused for a long time so we ignore it
 +            long ts = in.readLong();
 +            ByteBuffer value = ByteBufferUtil.readWithLength(in);
 +            if (flag == SerializationHelper.Flag.FROM_REMOTE || (flag == SerializationHelper.Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)))
 +                value = CounterContext.instance().clearAllLocal(value);
 +            return new LegacyCell(LegacyCell.Kind.COUNTER, decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, Cell.NO_DELETION_TIME, Cell.NO_TTL);
 +        }
 +        else if ((mask & EXPIRATION_MASK) != 0)
 +        {
 +            int ttl = in.readInt();
 +            int expiration = in.readInt();
 +            long ts = in.readLong();
 +            ByteBuffer value = ByteBufferUtil.readWithLength(in);
 +            return new LegacyCell(LegacyCell.Kind.EXPIRING, decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, expiration, ttl);
 +        }
 +        else
 +        {
 +            long ts = in.readLong();
 +            ByteBuffer value = ByteBufferUtil.readWithLength(in);
 +            LegacyCellName name = decodeCellName(metadata, cellname, readAllAsDynamic);
 +            return (mask & COUNTER_UPDATE_MASK) != 0
 +                ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createUpdate(ByteBufferUtil.toLong(value)), ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
 +                : ((mask & DELETION_MASK) == 0
 +                        ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
 +                        : new LegacyCell(LegacyCell.Kind.DELETED, name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), Cell.NO_TTL));
 +        }
 +    }
 +
 +    public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInputPlus in, ByteBuffer boundname) throws IOException
 +    {
 +        LegacyBound min = decodeBound(metadata, boundname, true);
 +        LegacyBound max = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
 +        DeletionTime dt = DeletionTime.serializer.deserialize(in);
 +        return new LegacyRangeTombstone(min, max, dt);
 +    }
 +
 +    public static Iterator<LegacyCell> deserializeCells(final CFMetaData metadata,
 +                                                        final DataInput in,
 +                                                        final SerializationHelper.Flag flag,
 +                                                        final int size)
 +    {
 +        return new AbstractIterator<LegacyCell>()
 +        {
 +            private int i = 0;
 +
 +            protected LegacyCell computeNext()
 +            {
 +                if (i >= size)
 +                    return endOfData();
 +
 +                ++i;
 +                try
 +                {
 +                    return readLegacyCell(metadata, in, flag);
 +                }
 +                catch (UnknownColumnException e)
 +                {
 +                    // We can get there if we read a cell for a dropped column, and if that is the case,
 +                    // then simply ignore the cell is fine. But also not that we ignore if it's the
 +                    // system keyspace because for those table we actually remove columns without registering
 +                    // them in the dropped columns
 +                    if (metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null)
 +                        return computeNext();
 +                    else
 +                        throw new IOError(e);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new IOError(e);
 +                }
 +            }
 +        };
 +    }
 +
 +    public static class CellGrouper
 +    {
 +        /**
 +         * The fake TTL used for expired rows that have been compacted.
 +         */
 +        private static final int FAKE_TTL = 1;
 +
 +        public final CFMetaData metadata;
 +        private final boolean isStatic;
 +        private final SerializationHelper helper;
 +        private final Row.Builder builder;
 +        private Clustering clustering;
 +
 +        private LegacyRangeTombstone rowDeletion;
 +        private LegacyRangeTombstone collectionDeletion;
 +
 +        public CellGrouper(CFMetaData metadata, SerializationHelper helper)
 +        {
 +            this(metadata, helper, false);
 +        }
 +
 +        private CellGrouper(CFMetaData metadata, SerializationHelper helper, boolean isStatic)
 +        {
 +            this.metadata = metadata;
 +            this.isStatic = isStatic;
 +            this.helper = helper;
 +            // We cannot use a sorted builder because we don't have exactly the same ordering in 3.0 and pre-3.0. More precisely, within a row, we
 +            // store all simple columns before the complex ones in 3.0, which we use to sort everything sorted by the column name before. Note however
 +            // that the unsorted builder won't have to reconcile cells, so the exact value we pass for nowInSec doesn't matter.
 +            this.builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
 +        }
 +
 +        public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper)
 +        {
 +            return new CellGrouper(metadata, helper, true);
 +        }
 +
 +        public void reset()
 +        {
 +            this.clustering = null;
 +            this.rowDeletion = null;
 +            this.collectionDeletion = null;
 +        }
 +
 +        public boolean addAtom(LegacyAtom atom)
 +        {
 +            return atom.isCell()
 +                 ? addCell(atom.asCell())
 +                 : addRangeTombstone(atom.asRangeTombstone());
 +        }
 +
 +        public boolean addCell(LegacyCell cell)
 +        {
 +            if (clustering == null)
 +            {
 +                clustering = cell.name.clustering;
 +                assert !isStatic || clustering == Clustering.STATIC_CLUSTERING;
 +                builder.newRow(clustering);
 +            }
 +            else if (!clustering.equals(cell.name.clustering))
 +            {
 +                return false;
 +            }
 +
 +            // Ignore shadowed cells
 +            if (rowDeletion != null && rowDeletion.deletionTime.deletes(cell.timestamp))
 +                return true;
 +
 +            ColumnDefinition column = cell.name.column;
 +            if (column == null)
 +            {
 +                // It's the row marker
 +                assert !cell.value.hasRemaining();
 +                // In 2.1, the row marker expired cell might have been converted into a deleted one by compaction.
 +                // If we do not set the primary key liveness info for this row and it does not contains any regular columns
 +                // the row will be empty. To avoid that, we reuse the localDeletionTime but use a fake TTL.
 +                // The only time in 2.x that we actually delete a row marker is in 2i tables, so in that case we do
 +                // want to actually propagate the row deletion. (CASSANDRA-13320)
 +                if (!cell.isTombstone())
 +                    builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(cell.timestamp, cell.ttl, cell.localDeletionTime));
 +                else if (metadata.isIndex())
 +                    builder.addRowDeletion(Row.Deletion.regular(new DeletionTime(cell.timestamp, cell.localDeletionTime)));
 +                else
 +                    builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(cell.timestamp, FAKE_TTL, cell.localDeletionTime));
 +            }
 +            else
 +            {
 +                if (collectionDeletion != null && collectionDeletion.start.collectionName.name.equals(column.name) && collectionDeletion.deletionTime.deletes(cell.timestamp))
 +                    return true;
 +
 +                if (helper.includes(column))
 +                {
 +                    CellPath path = null;
 +                    if (column.isComplex())
 +                    {
 +                        // Recalling startOfComplexColumn for every cell is a big inefficient, but it's ok in practice
 +                        // and it's simpler. And since 1) this only matter for super column selection in thrift in
 +                        // practice and 2) is only used during upgrade, it's probably worth keeping things simple.
 +                        helper.startOfComplexColumn(column);
 +                        path = cell.name.collectionElement == null ? null : CellPath.create(cell.name.collectionElement);
 +                        if (!helper.includes(path))
 +                            return true;
 +                    }
 +                    Cell c = new BufferCell(column, cell.timestamp, cell.ttl, cell.localDeletionTime, cell.value, path);
 +                    if (!helper.isDropped(c, column.isComplex()))
 +                        builder.addCell(c);
 +                    if (column.isComplex())
 +                    {
 +                        helper.endOfComplexColumn();
 +                    }
 +                }
 +            }
 +            return true;
 +        }
 +
 +        private boolean addRangeTombstone(LegacyRangeTombstone tombstone)
 +        {
 +            if (tombstone.isRowDeletion(metadata))
 +                return addRowTombstone(tombstone);
 +            else if (tombstone.isCollectionTombstone())
 +                return addCollectionTombstone(tombstone);
 +            else
 +                return addGenericRangeTombstone(tombstone);
 +        }
 +
 +        private boolean addRowTombstone(LegacyRangeTombstone tombstone)
 +        {
 +            if (clustering != null)
 +            {
 +                // If we're already in the row, there might be a chance that there were two range tombstones
 +                // written, as 2.x storage format does not guarantee just one range tombstone, unlike 3.x.
 +                // We have to make sure that clustering matches, which would mean that tombstone is for the
 +                // same row.
 +                if (rowDeletion != null && clustering.equals(tombstone.start.getAsClustering(metadata)))
 +                {
 +                    // If the tombstone superceeds the previous delete, we discard the previous one
 +                    if (tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
 +                    {
 +                        builder.addRowDeletion(Row.Deletion.regular(tombstone.deletionTime));
 +                        rowDeletion = tombstone;
 +                    }
 +                    return true;
 +                }
 +
 +                // If we're already within a row and there was no delete written before that one, it can't be the same one
 +                return false;
 +            }
 +
 +            clustering = tombstone.start.getAsClustering(metadata);
 +            builder.newRow(clustering);
 +            builder.addRowDeletion(Row.Deletion.regular(tombstone.deletionTime));
 +            rowDeletion = tombstone;
 +
 +            return true;
 +        }
 +
 +        private boolean addCollectionTombstone(LegacyRangeTombstone tombstone)
 +        {
 +            if (!helper.includes(tombstone.start.collectionName))
 +                return false; // see CASSANDRA-13109
 +
 +            if (clustering == null)
 +            {
 +                clustering = tombstone.start.getAsClustering(metadata);
 +                builder.newRow(clustering);
 +            }
 +            else if (!clustering.equals(tombstone.start.getAsClustering(metadata)))
 +            {
 +                return false;
 +            }
 +
 +            builder.addComplexDeletion(tombstone.start.collectionName, tombstone.deletionTime);
 +            if (rowDeletion == null || tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
 +                collectionDeletion = tombstone;
 +
 +            return true;
 +        }
 +
 +        private boolean addGenericRangeTombstone(LegacyRangeTombstone tombstone)
 +        {
 +            /*
 +             * We can see a non-collection, non-row deletion in two scenarios:
 +             *
 +             * 1. Most commonly, the tombstone's start bound is bigger than current row's clustering, which means that
 +             *    the current row is over, and we should move on to the next row or RT;
 +             *
 +             * 2. Less commonly, the tombstone's start bound is smaller than current row's clustering, which means that
 +             *    we've crossed an index boundary and are seeing a non-closed RT from the previous block, repeated;
 +             *    we should ignore it and stay in the current row.
 +             *
 +             *  In either case, clustering should be non-null, or we shouldn't have gotten to this method at all
 +             *  However, to be absolutely SURE we're in case two above, we check here.
 +             */
 +            return clustering != null && metadata.comparator.compare(clustering, tombstone.start.bound.clustering()) > 0;
 +        }
 +
 +        public Row getRow()
 +        {
 +            return builder.build();
 +        }
 +    }
 +
 +    public static class LegacyUnfilteredPartition
 +    {
 +        public final DeletionTime partitionDeletion;
 +        public final LegacyRangeTombstoneList rangeTombstones;
 +        public final List<LegacyCell> cells;
 +
 +        private LegacyUnfilteredPartition(DeletionTime partitionDeletion, LegacyRangeTombstoneList rangeTombstones, List<LegacyCell> cells)
 +        {
 +            this.partitionDeletion = partitionDeletion;
 +            this.rangeTombstones = rangeTombstones;
 +            this.cells = cells;
 +        }
 +
 +        public void digest(CFMetaData metadata, MessageDigest digest)
 +        {
 +            for (LegacyCell cell : cells)
 +            {
 +                digest.update(cell.name.encode(metadata).duplicate());
 +
 +                if (cell.isCounter())
 +                    CounterContext.instance().updateDigest(digest, cell.value);
 +                else
 +                    digest.update(cell.value.duplicate());
 +
 +                FBUtilities.updateWithLong(digest, cell.timestamp);
 +                FBUtilities.updateWithByte(digest, cell.serializationFlags());
 +
 +                if (cell.isExpiring())
 +                    FBUtilities.updateWithInt(digest, cell.ttl);
 +
 +                if (cell.isCounter())
 +                {
 +                    // Counters used to have the timestampOfLastDelete field, which we stopped using long ago and has been hard-coded
 +                    // to Long.MIN_VALUE but was still taken into account in 2.2 counter digests (to maintain backward compatibility
 +                    // in the first place).
 +                    FBUtilities.updateWithLong(digest, Long.MIN_VALUE);
 +                }
 +            }
 +
 +            if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE)
 +                digest.update(ByteBufferUtil.bytes(partitionDeletion.markedForDeleteAt()));
 +
 +            if (!rangeTombstones.isEmpty())
 +                rangeTombstones.updateDigest(digest);
 +        }
 +    }
 +
 +    public static class LegacyCellName
 +    {
 +        public final Clustering clustering;
 +        public final ColumnDefinition column;
 +        public final ByteBuffer collectionElement;
 +
 +        private LegacyCellName(Clustering clustering, ColumnDefinition column, ByteBuffer collectionElement)
 +        {
 +            this.clustering = clustering;
 +            this.column = column;
 +            this.collectionElement = collectionElement;
 +        }
 +
 +        public ByteBuffer encode(CFMetaData metadata)
 +        {
 +            return encodeCellName(metadata, clustering, column == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : column.name.bytes, collectionElement);
 +        }
 +
 +        public ByteBuffer superColumnSubName()
 +        {
 +            assert collectionElement != null;
 +            return collectionElement;
 +        }
 +
 +        public ByteBuffer superColumnName()
 +        {
 +            return clustering.get(0);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +            for (int i = 0; i < clustering.size(); i++)
 +                sb.append(i > 0 ? ":" : "").append(clustering.get(i) == null ? "null" : ByteBufferUtil.bytesToHex(clustering.get(i)));
 +            return String.format("Cellname(clustering=%s, column=%s, collElt=%s)", sb.toString(), column == null ? "null" : column.name, collectionElement == null ? "null" : ByteBufferUtil.bytesToHex(collectionElement));
 +        }
 +    }
 +
 +    public static class LegacyBound
 +    {
 +        public static final LegacyBound BOTTOM = new LegacyBound(Slice.Bound.BOTTOM, false, null);
 +        public static final LegacyBound TOP = new LegacyBound(Slice.Bound.TOP, false, null);
 +
 +        public final Slice.Bound bound;
 +        public final boolean isStatic;
 +        public final ColumnDefinition collectionName;
 +
 +        public LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName)
 +        {
 +            this.bound = bound;
 +            this.isStatic = isStatic;
 +            this.collectionName = collectionName;
 +        }
 +
 +        public Clustering getAsClustering(CFMetaData metadata)
 +        {
 +            if (isStatic)
 +                return Clustering.STATIC_CLUSTERING;
 +
 +            assert bound.size() == metadata.comparator.size();
 +            ByteBuffer[] values = new ByteBuffer[bound.size()];
 +            for (int i = 0; i < bound.size(); i++)
 +                values[i] = bound.get(i);
 +            return new Clustering(values);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +            sb.append(bound.kind()).append('(');
 +            for (int i = 0; i < bound.size(); i++)
 +                sb.append(i > 0 ? ":" : "").append(bound.get(i) == null ? "null" : ByteBufferUtil.bytesToHex(bound.get(i)));
 +            sb.append(')');
 +            return String.format("Bound(%s, collection=%s)", sb.toString(), collectionName == null ? "null" : collectionName.name);
 +        }
 +    }
 +
 +    public interface LegacyAtom
 +    {
 +        public boolean isCell();
 +
 +        public ClusteringPrefix clustering();
 +        public boolean isStatic();
 +
 +        public LegacyCell asCell();
 +        public LegacyRangeTombstone asRangeTombstone();
 +    }
 +
 +    /**
 +     * A legacy cell.
 +     * <p>
 +     * This is used as a temporary object to facilitate dealing with the legacy format, this
 +     * is not meant to be optimal.
 +     */
 +    public static class LegacyCell implements LegacyAtom
 +    {
 +        private final static int DELETION_MASK        = 0x01;
 +        private final static int EXPIRATION_MASK      = 0x02;
 +        private final static int COUNTER_MASK         = 0x04;
 +        private final static int COUNTER_UPDATE_MASK  = 0x08;
 +        private final static int RANGE_TOMBSTONE_MASK = 0x10;
 +
 +        public enum Kind { REGULAR, EXPIRING, DELETED, COUNTER }
 +
 +        public final Kind kind;
 +
 +        public final LegacyCellName name;
 +        public final ByteBuffer value;
 +
 +        public final long timestamp;
 +        public final int localDeletionTime;
 +        public final int ttl;
 +
 +        private LegacyCell(Kind kind, LegacyCellName name, ByteBuffer value, long timestamp, int localDeletionTime, int ttl)
 +        {
 +            this.kind = kind;
 +            this.name = name;
 +            this.value = value;
 +            this.timestamp = timestamp;
 +            this.localDeletionTime = localDeletionTime;
 +            this.ttl = ttl;
 +        }
 +
 +        public static LegacyCell regular(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, ByteBuffer value, long timestamp)
 +        throws UnknownColumnException
 +        {
 +            return new LegacyCell(Kind.REGULAR, decodeCellName(metadata, superColumnName, name), value, timestamp, Cell.NO_DELETION_TIME, Cell.NO_TTL);
 +        }
 +
 +        public static LegacyCell expiring(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, ByteBuffer value, long timestamp, int ttl, int nowInSec)
 +        throws UnknownColumnException
 +        {
-             return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, superColumnName, name), value, timestamp, nowInSec + ttl, ttl);
++            /*
++             * CASSANDRA-14092: Max expiration date capping is maybe performed here, expiration overflow policy application
++             * is done at {@link org.apache.cassandra.thrift.ThriftValidation#validateTtl(CFMetaData, Column)}
++             */
++            return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, superColumnName, name), value, timestamp, ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl), ttl);
 +        }
 +
 +        public static LegacyCell tombstone(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, long timestamp, int nowInSec)
 +        throws UnknownColumnException
 +        {
 +            return new LegacyCell(Kind.DELETED, decodeCellName(metadata, superColumnName, name), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, nowInSec, LivenessInfo.NO_TTL);
 +        }
 +
 +        public static LegacyCell counterUpdate(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, long value)
 +        throws UnknownColumnException
 +        {
 +            // See UpdateParameters.addCounter() for more details on this
 +            ByteBuffer counterValue = CounterContext.instance().createUpdate(value);
 +            return counter(decodeCellName(metadata, superColumnName, name), counterValue);
 +        }
 +
 +        public static LegacyCell counter(LegacyCellName name, ByteBuffer value)
 +        {
 +            return new LegacyCell(Kind.COUNTER, name, value, FBUtilities.timestampMicros(), Cell.NO_DELETION_TIME, Cell.NO_TTL);
 +        }
 +
 +        public byte serializationFlags()
 +        {
 +            if (isExpiring())
 +                return EXPIRATION_MASK;
 +            if (isTombstone())
 +                return DELETION_MASK;
 +            if (isCounterUpdate())
 +                return COUNTER_UPDATE_MASK;
 +            if (isCounter())
 +                return COUNTER_MASK;
 +            return 0;
 +        }
 +
 +        public boolean isCounterUpdate()
 +        {
 +            // See UpdateParameters.addCounter() for more details on this
 +            return isCounter() && CounterContext.instance().isUpdate(value);
 +        }
 +
 +        public ClusteringPrefix clustering()
 +        {
 +            return name.clustering;
 +        }
 +
 +        public boolean isStatic()
 +        {
 +            return name.clustering == Clustering.STATIC_CLUSTERING;
 +        }
 +
 +        public boolean isCell()
 +        {
 +            return true;
 +        }
 +
 +        public LegacyCell asCell()
 +        {
 +            return this;
 +        }
 +
 +        public LegacyRangeTombstone asRangeTombstone()
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public boolean isCounter()
 +        {
 +            return kind == Kind.COUNTER;
 +        }
 +
 +        public boolean isExpiring()
 +        {
 +            return kind == Kind.EXPIRING;
 +        }
 +
 +        public boolean isTombstone()
 +        {
 +            return kind == Kind.DELETED;
 +        }
 +
 +        public boolean isLive(int nowInSec)
 +        {
 +            if (isTombstone())
 +                return false;
 +
 +            return !isExpiring() || nowInSec < localDeletionTime;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("LegacyCell(%s, name=%s, v=%s, ts=%s, ldt=%s, ttl=%s)", kind, name, ByteBufferUtil.bytesToHex(value), timestamp, localDeletionTime, ttl);
 +        }
 +    }
 +
 +    /**
 +     * A legacy range tombstone.
 +     * <p>
 +     * This is used as a temporary object to facilitate dealing with the legacy format, this
 +     * is not meant to be optimal.
 +     */
 +    public static class LegacyRangeTombstone implements LegacyAtom
 +    {
 +        public final LegacyBound start;
 +        public final LegacyBound stop;
 +        public final DeletionTime deletionTime;
 +
 +        public LegacyRangeTombstone(LegacyBound start, LegacyBound stop, DeletionTime deletionTime)
 +        {
 +            // Because of the way RangeTombstoneList work, we can have a tombstone where only one of
 +            // the bound has a collectionName. That happens if we have a big tombstone A (spanning one
 +            // or multiple rows) and a collection tombstone B. In that case, RangeTombstoneList will
 +            // split this into 3 RTs: the first one from the beginning of A to the beginning of B,
 +            // then B, then a third one from the end of B to the end of A. To make this simpler, if
 +            // we detect that case we transform the 1st and 3rd tombstone so they don't end in the middle
 +            // of a row (which is still correct).
 +            if ((start.collectionName == null) != (stop.collectionName == null))
 +            {
 +                if (start.collectionName == null)
 +                    stop = new LegacyBound(stop.bound, stop.isStatic, null);
 +                else
 +                    start = new LegacyBound(start.bound, start.isStatic, null);
 +            }
 +            else if (!Objects.equals(start.collectionName, stop.collectionName))
 +            {
 +                // We're in the similar but slightly more complex case where on top of the big tombstone
 +                // A, we have 2 (or more) collection tombstones B and C within A. So we also end up with
 +                // a tombstone that goes between the end of B and the start of C.
 +                start = new LegacyBound(start.bound, start.isStatic, null);
 +                stop = new LegacyBound(stop.bound, stop.isStatic, null);
 +            }
 +
 +            this.start = start;
 +            this.stop = stop;
 +            this.deletionTime = deletionTime;
 +        }
 +
 +        public ClusteringPrefix clustering()
 +        {
 +            return start.bound;
 +        }
 +
 +        public LegacyRangeTombstone withNewStart(LegacyBound newStart)
 +        {
 +            return new LegacyRangeTombstone(newStart, stop, deletionTime);
 +        }
 +
 +        public LegacyRangeTombstone withNewEnd(LegacyBound newStop)
 +        {
 +            return new LegacyRangeTombstone(start, newStop, deletionTime);
 +        }
 +
 +        public boolean isCell()
 +        {
 +            return false;
 +        }
 +
 +        public boolean isStatic()
 +        {
 +            return start.isStatic || stop.isStatic;
 +        }
 +
 +        public LegacyCell asCell()
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public LegacyRangeTombstone asRangeTombstone()
 +        {
 +            return this;
 +        }
 +
 +        public boolean isCollectionTombstone()
 +        {
 +            return start.collectionName != null;
 +        }
 +
 +        public boolean isRowDeletion(CFMetaData metadata)
 +        {
 +            if (start.collectionName != null
 +                || stop.collectionName != null
 +                || start.bound.size() != metadata.comparator.size()
 +                || stop.bound.size() != metadata.comparator.size())
 +                return false;
 +
 +            for (int i = 0; i < start.bound.size(); i++)
 +                if (!Objects.equals(start.bound.get(i), stop.bound.get(i)))
 +                    return false;
 +            return true;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("RT(%s-%s, %s)", start, stop, deletionTime);
 +        }
 +    }
 +
 +    public static class LegacyDeletionInfo
 +    {
 +        public final MutableDeletionInfo deletionInfo;
 +        public final List<LegacyRangeTombstone> inRowTombstones = new ArrayList<>();
 +
 +        private LegacyDeletionInfo(MutableDeletionInfo deletionInfo)
 +        {
 +            this.deletionInfo = deletionInfo;
 +        }
 +
 +        public static LegacyDeletionInfo live()
 +        {
 +            return new LegacyDeletionInfo(MutableDeletionInfo.live());
 +        }
 +
 +        public void add(DeletionTime topLevel)
 +        {
 +            deletionInfo.add(topLevel);
 +        }
 +
 +        private static Slice.Bound staticBound(CFMetaData metadata, boolean isStart)
 +        {
 +            // In pre-3.0 nodes, static row started by a clustering with all empty values so we
 +            // preserve that here. Note that in practice, it doesn't really matter since the rest
 +            // of the code will ignore the bound for RT that have their static flag set.
 +            ByteBuffer[] values = new ByteBuffer[metadata.comparator.size()];
 +            for (int i = 0; i < values.length; i++)
 +                values[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +            return isStart
 +                 ? Slice.Bound.inclusiveStartOf(values)
 +                 : Slice.Bound.inclusiveEndOf(values);
 +        }
 +
 +        public void add(CFMetaData metadata, LegacyRangeTombstone tombstone)
 +        {
 +            if (metadata.hasStaticColumns())
 +            {
 +                /*
 +                 * For table having static columns we have to deal with the following cases:
 +                 *  1. the end of the tombstone is static (in which case either the start is static or is BOTTOM, which is the same
 +                 *     for our consideration). This mean that either the range only delete the static row, or that it's a collection
 +                 *     tombstone of a static collection. In both case, we just add the tombstone to the inRowTombstones.
 +                 *  2. only the start is static. There is then 2 subcase: either the start is inclusive, and that mean we include the
 +                 *     static row and more (so we add an inRowTombstone for the static and deal with the rest normally). Or the start
 +                 *     is exclusive, and that means we explicitely exclude the static (in which case we can just add the tombstone
 +                 *     as if it started at BOTTOM).
 +                 *  3. none of the bound are static but the start is BOTTOM. This means we intended to delete the static row so we
 +                 *     need to add it to the inRowTombstones (and otherwise handle the range normally).
 +                 */
 +                if (tombstone.stop.isStatic)
 +                {
 +                    // If the start is BOTTOM, we replace it by the beginning of the starting row so as to not confuse the
 +                    // RangeTombstone.isRowDeletion() method
 +                    if (tombstone.start == LegacyBound.BOTTOM)
 +                        tombstone = tombstone.withNewStart(new LegacyBound(staticBound(metadata, true), true, null));
 +                    inRowTombstones.add(tombstone);
 +                    return;
 +                }
 +
 +                if (tombstone.start.isStatic)
 +                {
 +                    if (tombstone.start.bound.isInclusive())
 +                        inRowTombstones.add(tombstone.withNewEnd(new LegacyBound(staticBound(metadata, false), true, null)));
 +
 +                    tombstone = tombstone.withNewStart(LegacyBound.BOTTOM);
 +                }
 +                else if (tombstone.start == LegacyBound.BOTTOM)
 +                {
 +                    inRowTombstones.add(new LegacyRangeTombstone(new LegacyBound(staticBound(metadata, true), true, null),
 +                                                                 new LegacyBound(staticBound(metadata, false), true, null),
 +                                                                 tombstone.deletionTime));
 +                }
 +            }
 +
 +            if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata))
 +                inRowTombstones.add(tombstone);
 +            else
 +                add(metadata, new RangeTombstone(Slice.make(tombstone.start.bound, tombstone.stop.bound), tombstone.deletionTime));
 +        }
 +
 +        public void add(CFMetaData metadata, RangeTombstone tombstone)
 +        {
 +            deletionInfo.add(tombstone, metadata.comparator);
 +        }
 +
 +        public Iterator<LegacyRangeTombstone> inRowRangeTombstones()
 +        {
 +            return inRowTombstones.iterator();
 +        }
 +
 +        public static LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in) throws IOException
 +        {
 +            DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
 +
 +            int rangeCount = in.readInt();
 +            if (rangeCount == 0)
 +                return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel));
 +
 +            LegacyDeletionInfo delInfo = new LegacyDeletionInfo(new MutableDeletionInfo(topLevel));
 +            for (int i = 0; i < rangeCount; i++)
 +            {
 +                LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
 +                LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
 +                int delTime =  in.readInt();
 +                long markedAt = in.readLong();
 +
 +                delInfo.add(metadata, new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime)));
 +            }
 +            return delInfo;
 +        }
 +    }
 +
 +    /**
 +     * A helper class for LegacyRangeTombstoneList.  This replaces the Comparator<Composite> that RTL used before 3.0.
 +     */
 +    private static class LegacyBoundComparator implements Comparator<LegacyBound>
 +    {
 +        ClusteringComparator clusteringComparator;
 +
 +        public LegacyBoundComparator(ClusteringComparator clusteringComparator)
 +        {
 +            this.clusteringComparator = clusteringComparator;
 +        }
 +
 +        public int compare(LegacyBound a, LegacyBound b)
 +        {
 +            // In the legacy sorting, BOTTOM comes before anything else
 +            if (a == LegacyBound.BOTTOM)
 +                return b == LegacyBound.BOTTOM ? 0 : -1;
 +            if (b == LegacyBound.BOTTOM)
 +                return 1;
 +
 +            // Excluding BOTTOM, statics are always before anything else.
 +            if (a.isStatic != b.isStatic)
 +                return a.isStatic ? -1 : 1;
 +
 +            // We have to be careful with bound comparison because of collections. Namely, if the 2 bounds represent the
 +            // same prefix, then we should take the collectionName into account before taking the bounds kind
 +            // (ClusteringPrefix.Kind). This means we can't really call ClusteringComparator.compare() directly.
 +            // For instance, if
 +            //    a is (bound=INCL_START_BOUND('x'), collectionName='d')
 +            //    b is (bound=INCL_END_BOUND('x'),   collectionName='c')
 +            // Ten b < a since the element 'c' of collection 'x' comes before element 'd', but calling
 +            // clusteringComparator.compare(a.bound, b.bound) returns -1.
 +            // See CASSANDRA-13125 for details.
 +            int sa = a.bound.size();
 +            int sb = b.bound.size();
 +            for (int i = 0; i < Math.min(sa, sb); i++)
 +            {
 +                int cmp = clusteringComparator.compareComponent(i, a.bound.get(i), b.bound.get(i));
 +                if (cmp != 0)
 +                    return cmp;
 +            }
 +
 +            if (sa != sb)
 +                return sa < sb ? a.bound.kind().comparedToClustering : -b.bound.kind().comparedToClustering;
 +
 +            // Both bound represent the same prefix, compare the collection names
 +            // If one has a collection name and the other doesn't, the other comes before as it points to the beginning of the row.
 +            if ((a.collectionName == null) != (b.collectionName == null))
 +                return a.collectionName == null ? -1 : 1;
 +
 +            // If they both have a collection, compare that first
 +            if (a.collectionName != null)
 +            {
 +                int cmp = UTF8Type.instance.compare(a.collectionName.name.bytes, b.collectionName.name.bytes);
 +                if (cmp != 0)
 +                    return cmp;
 +            }
 +
 +            // Lastly, if everything so far is equal, compare their clustering kind
 +            return ClusteringPrefix.Kind.compare(a.bound.kind(), b.bound.kind());
 +        }
 +    }
 +
 +    /**
 +     * Almost an entire copy of RangeTombstoneList from C* 2.1.  The main difference is that LegacyBoundComparator
 +     * is used in place of Comparator<Composite> (because Composite doesn't exist any more).
 +     *
 +     * This class is needed to allow us to convert single-row deletions and complex deletions into range tombstones
 +     * and properly merge them into the normal set of range tombstones.
 +     */
 +    public static class LegacyRangeTombstoneList
 +    {
 +        private final LegacyBoundComparator comparator;
 +
 +        // Note: we don't want to use a List for the markedAts and delTimes to avoid boxin

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org