You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/09/08 10:40:45 UTC

cassandra git commit: Small optimizations of sstable index serialization

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f6695884e -> 60e45c0ae


Small optimizations of sstable index serialization

patch by slebresne; reviewed by aweisberg for CASSANDRA-10232


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60e45c0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60e45c0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60e45c0a

Branch: refs/heads/cassandra-3.0
Commit: 60e45c0ae83f10c6fd9526ce97234701cd5ea308
Parents: f669588
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Aug 26 17:39:54 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Sep 8 10:39:34 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  20 +--
 .../org/apache/cassandra/db/RowIndexEntry.java  | 133 ++++++++++++++-----
 .../org/apache/cassandra/db/Serializers.java    |   5 -
 .../columniterator/AbstractSSTableIterator.java |  13 +-
 .../cassandra/io/sstable/IndexHelper.java       | 109 ++++++---------
 .../cassandra/io/sstable/KeyIterator.java       |   8 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 +-
 .../io/sstable/format/SSTableReader.java        |   6 +-
 .../io/sstable/format/big/BigTableReader.java   |   2 +-
 .../io/sstable/format/big/BigTableScanner.java  |   4 +-
 .../apache/cassandra/service/CacheService.java  |   7 +-
 .../db/SinglePartitionSliceCommandTest.java     |  39 +++---
 .../cassandra/io/sstable/IndexHelperTest.java   |   6 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   8 +-
 15 files changed, 212 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index afd45e5..d6ebe7a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Small optimizations of sstable index serialization (CASSANDRA-10232)
  * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 9eef23e..b350f90 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -31,14 +31,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnIndex
 {
+    public final long partitionHeaderLength;
     public final List<IndexHelper.IndexInfo> columnsIndex;
 
-    private static final ColumnIndex EMPTY = new ColumnIndex(Collections.<IndexHelper.IndexInfo>emptyList());
+    private static final ColumnIndex EMPTY = new ColumnIndex(-1, Collections.<IndexHelper.IndexInfo>emptyList());
 
-    private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex)
+    private ColumnIndex(long partitionHeaderLength, List<IndexHelper.IndexInfo> columnsIndex)
     {
         assert columnsIndex != null;
 
+        this.partitionHeaderLength = partitionHeaderLength;
         this.columnsIndex = columnsIndex;
     }
 
@@ -67,8 +69,10 @@ public class ColumnIndex
         private final SerializationHeader header;
         private final int version;
 
-        private final ColumnIndex result;
+        private final List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>();
         private final long initialPosition;
+        private long headerLength = -1;
+
         private long startPosition = -1;
 
         private int written;
@@ -87,8 +91,6 @@ public class ColumnIndex
             this.writer = writer;
             this.header = header;
             this.version = version;
-
-            this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
             this.initialPosition = writer.getFilePointer();
         }
 
@@ -103,6 +105,7 @@ public class ColumnIndex
         public ColumnIndex build() throws IOException
         {
             writePartitionHeader(iterator);
+            this.headerLength = writer.getFilePointer() - initialPosition;
 
             while (iterator.hasNext())
                 add(iterator.next());
@@ -119,10 +122,9 @@ public class ColumnIndex
         {
             IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstClustering,
                                                                          lastClustering,
-                                                                         startPosition,
                                                                          currentPosition() - startPosition,
                                                                          openMarker);
-            result.columnsIndex.add(cIndexInfo);
+            columnsIndex.add(cIndexInfo);
             firstClustering = null;
         }
 
@@ -164,8 +166,8 @@ public class ColumnIndex
                 addIndexBlock();
 
             // we should always have at least one computed index block, but we only write it out if there is more than that.
-            assert result.columnsIndex.size() > 0;
-            return result;
+            assert columnsIndex.size() > 0 && headerLength >= 0;
+            return new ColumnIndex(headerLength, columnsIndex);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index e783508..f63e893 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -47,7 +47,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         this.position = position;
     }
 
-    public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header)
+    protected int promotedSize(IndexHelper.IndexInfo.Serializer idxSerializer)
     {
         return 0;
     }
@@ -61,7 +61,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         // since if there are insufficient columns to be worth indexing we're going to seek to
         // the beginning of the row anyway, so we might as well read the tombstone there as well.
         if (index.columnsIndex.size() > 1)
-            return new IndexedEntry(position, deletionTime, index.columnsIndex);
+            return new IndexedEntry(position, deletionTime, index.partitionHeaderLength, index.columnsIndex);
         else
             return new RowIndexEntry<>(position);
     }
@@ -89,6 +89,16 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         return 0;
     }
 
+    /**
+     * The length of the row header (partition key, partition deletion and static row).
+     * This value is only provided for indexed entries and this method will throw
+     * {@code UnsupportedOperationException} if {@code !isIndexed()}.
+     */
+    public long headerLength()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public List<T> columnsIndex()
     {
         return Collections.emptyList();
@@ -108,48 +118,81 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
     public static class Serializer implements IndexSerializer<IndexHelper.IndexInfo>
     {
-        private final CFMetaData metadata;
+        private final IndexHelper.IndexInfo.Serializer idxSerializer;
         private final Version version;
-        private final SerializationHeader header;
 
         public Serializer(CFMetaData metadata, Version version, SerializationHeader header)
         {
-            this.metadata = metadata;
+            this.idxSerializer = new IndexHelper.IndexInfo.Serializer(metadata, version, header);
             this.version = version;
-            this.header = header;
         }
 
         public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, DataOutputPlus out) throws IOException
         {
-            out.writeLong(rie.position);
-            out.writeInt(rie.promotedSize(metadata, version, header));
+            assert version.storeRows() : "We read old index files but we should never write them";
+
+            out.writeUnsignedVInt(rie.position);
+            out.writeUnsignedVInt(rie.promotedSize(idxSerializer));
 
             if (rie.isIndexed())
             {
+                out.writeUnsignedVInt(rie.headerLength());
                 DeletionTime.serializer.serialize(rie.deletionTime(), out);
-                out.writeInt(rie.columnsIndex().size());
-                IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
+                out.writeUnsignedVInt(rie.columnsIndex().size());
                 for (IndexHelper.IndexInfo info : rie.columnsIndex())
-                    idxSerializer.serialize(info, out, header);
+                    idxSerializer.serialize(info, out);
             }
         }
 
         public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInputPlus in) throws IOException
         {
-            long position = in.readLong();
+            if (!version.storeRows())
+            {
+                long position = in.readLong();
+
+                int size = in.readInt();
+                if (size > 0)
+                {
+                    DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
+
+                    int entries = in.readInt();
+                    List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries);
+
+                    // The old format didn't saved the partition header length per-se, but rather for each entry it's
+                    // offset from the beginning of the row. We don't use that offset anymore, but we do need the
+                    // header length so we basically need the first entry offset. And so we inline the deserialization
+                    // of the first index entry to get that information. While this is a bit ugly, we'll get rid of that
+                    // code once pre-3.0 backward compatibility is dropped so it feels fine as a temporary hack.
+                    ClusteringPrefix firstName = idxSerializer.clusteringSerializer.deserialize(in);
+                    ClusteringPrefix lastName = idxSerializer.clusteringSerializer.deserialize(in);
+                    long headerLength = in.readLong();
+                    long width = in.readLong();
+
+                    columnsIndex.add(new IndexHelper.IndexInfo(firstName, lastName, width, null));
+                    for (int i = 1; i < entries; i++)
+                        columnsIndex.add(idxSerializer.deserialize(in));
+
+                    return new IndexedEntry(position, deletionTime, headerLength, columnsIndex);
+                }
+                else
+                {
+                    return new RowIndexEntry<>(position);
+                }
+            }
 
-            int size = in.readInt();
+            long position = in.readUnsignedVInt();
+
+            int size = (int)in.readUnsignedVInt();
             if (size > 0)
             {
+                long headerLength = in.readUnsignedVInt();
                 DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
-
-                int entries = in.readInt();
-                IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
+                int entries = (int)in.readUnsignedVInt();
                 List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries);
                 for (int i = 0; i < entries; i++)
-                    columnsIndex.add(idxSerializer.deserialize(in, header));
+                    columnsIndex.add(idxSerializer.deserialize(in));
 
-                return new IndexedEntry(position, deletionTime, columnsIndex);
+                return new IndexedEntry(position, deletionTime, headerLength, columnsIndex);
             }
             else
             {
@@ -157,15 +200,23 @@ public class RowIndexEntry<T> implements IMeasurableMemory
             }
         }
 
-        public static void skip(DataInput in) throws IOException
+        // Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle
+        // of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize'
+        // should be used instead.
+        public static long readPosition(DataInputPlus in, Version version) throws IOException
         {
-            in.readLong();
-            skipPromotedIndex(in);
+            return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
         }
 
-        public static void skipPromotedIndex(DataInput in) throws IOException
+        public static void skip(DataInputPlus in, Version version) throws IOException
         {
-            int size = in.readInt();
+            readPosition(in, version);
+            skipPromotedIndex(in, version);
+        }
+
+        public static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException
+        {
+            int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt();
             if (size <= 0)
                 return;
 
@@ -174,21 +225,21 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)
         {
-            int size = TypeSizes.sizeof(rie.position) + TypeSizes.sizeof(rie.promotedSize(metadata, version, header));
+            assert version.storeRows() : "We read old index files but we should never write them";
+
+            int size = TypeSizes.sizeofUnsignedVInt(rie.position) + TypeSizes.sizeofUnsignedVInt(rie.promotedSize(idxSerializer));
 
             if (rie.isIndexed())
             {
                 List<IndexHelper.IndexInfo> index = rie.columnsIndex();
 
+                size += TypeSizes.sizeofUnsignedVInt(rie.headerLength());
                 size += DeletionTime.serializer.serializedSize(rie.deletionTime());
-                size += TypeSizes.sizeof(index.size());
+                size += TypeSizes.sizeofUnsignedVInt(index.size());
 
-                IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
                 for (IndexHelper.IndexInfo info : index)
-                    size += idxSerializer.serializedSize(info, header);
+                    size += idxSerializer.serializedSize(info);
             }
-
-
             return size;
         }
     }
@@ -199,17 +250,21 @@ public class RowIndexEntry<T> implements IMeasurableMemory
     private static class IndexedEntry extends RowIndexEntry<IndexHelper.IndexInfo>
     {
         private final DeletionTime deletionTime;
+
+        // The offset in the file when the index entry end
+        private final long headerLength;
         private final List<IndexHelper.IndexInfo> columnsIndex;
         private static final long BASE_SIZE =
-                ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, Arrays.<IndexHelper.IndexInfo>asList(null, null)))
+                ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, Arrays.<IndexHelper.IndexInfo>asList(null, null)))
               + ObjectSizes.measure(new ArrayList<>(1));
 
-        private IndexedEntry(long position, DeletionTime deletionTime, List<IndexHelper.IndexInfo> columnsIndex)
+        private IndexedEntry(long position, DeletionTime deletionTime, long headerLength, List<IndexHelper.IndexInfo> columnsIndex)
         {
             super(position);
             assert deletionTime != null;
             assert columnsIndex != null && columnsIndex.size() > 1;
             this.deletionTime = deletionTime;
+            this.headerLength = headerLength;
             this.columnsIndex = columnsIndex;
         }
 
@@ -220,19 +275,25 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         }
 
         @Override
+        public long headerLength()
+        {
+            return headerLength;
+        }
+
+        @Override
         public List<IndexHelper.IndexInfo> columnsIndex()
         {
             return columnsIndex;
         }
 
         @Override
-        public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header)
+        protected int promotedSize(IndexHelper.IndexInfo.Serializer idxSerializer)
         {
-            long size = DeletionTime.serializer.serializedSize(deletionTime);
-            size += TypeSizes.sizeof(columnsIndex.size()); // number of entries
-            IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
+            long size = TypeSizes.sizeofUnsignedVInt(headerLength)
+                      + DeletionTime.serializer.serializedSize(deletionTime)
+                      + TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // number of entries
             for (IndexHelper.IndexInfo info : columnsIndex)
-                size += idxSerializer.serializedSize(info, header);
+                size += idxSerializer.serializedSize(info);
 
             return Ints.checkedCast(size);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 2561bbe..9b29d89 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -43,11 +43,6 @@ public class Serializers
         this.metadata = metadata;
     }
 
-    public IndexInfo.Serializer indexSerializer(Version version)
-    {
-        return new IndexInfo.Serializer(metadata, version);
-    }
-
     // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this method and inline the calls to
     // ClusteringPrefix.serializer in IndexHelper directly. At which point this whole class probably becomes
     // unecessary (since IndexInfo.Serializer won't depend on the metadata either).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 87a57c6..c075a2b 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -412,6 +412,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 
         private final RowIndexEntry indexEntry;
         private final List<IndexHelper.IndexInfo> indexes;
+        private final long[] blockOffsets;
         private final boolean reversed;
 
         private int currentIndexIdx;
@@ -427,6 +428,14 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             this.indexes = indexEntry.columnsIndex();
             this.reversed = reversed;
             this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() : -1;
+
+            this.blockOffsets = new long[indexes.size()];
+            long offset = indexEntry.position + indexEntry.headerLength();
+            for (int i = 0; i < blockOffsets.length; i++)
+            {
+                blockOffsets[i] = offset;
+                offset += indexes.get(i).width;
+            }
         }
 
         public boolean isDone()
@@ -438,7 +447,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         public void setToBlock(int blockIdx) throws IOException
         {
             if (blockIdx >= 0 && blockIdx < indexes.size())
-                reader.seekToPosition(indexEntry.position + indexes.get(blockIdx).offset);
+                reader.seekToPosition(blockOffsets[blockIdx]);
 
             currentIndexIdx = blockIdx;
             reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null;
@@ -461,7 +470,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 
                 // We have to set the mark, and we have to set it at the beginning of the block. So if we're not at the beginning of the block, this forces us to a weird seek dance.
                 // This can only happen when reading old file however.
-                long startOfBlock = indexEntry.position + indexes.get(currentIndexIdx).offset;
+                long startOfBlock = blockOffsets[currentIndexIdx];
                 long currentFilePointer = reader.file.getFilePointer();
                 if (startOfBlock == currentFilePointer)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 4dabe69..e95af29 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -37,34 +37,6 @@ import org.apache.cassandra.utils.*;
  */
 public class IndexHelper
 {
-    public static void skipBloomFilter(DataInput in) throws IOException
-    {
-        int size = in.readInt();
-        FileUtils.skipBytesFully(in, size);
-    }
-
-    /**
-     * Skip the index
-     * @param in the data input from which the index should be skipped
-     * @throws IOException if an I/O error occurs.
-     */
-    public static void skipIndex(DataInput in) throws IOException
-    {
-        /* read only the column index list */
-        int columnIndexSize = in.readInt();
-        /* skip the column index data */
-        if (in instanceof FileDataInput)
-        {
-            FileUtils.skipBytesFully(in, columnIndexSize);
-        }
-        else
-        {
-            // skip bytes
-            byte[] skip = new byte[columnIndexSize];
-            in.readFully(skip);
-        }
-    }
-
     /**
      * The index of the IndexInfo in which a scan starting with @name should begin.
      *
@@ -78,7 +50,7 @@ public class IndexHelper
      */
     public static int indexFor(ClusteringPrefix name, List<IndexInfo> indexList, ClusteringComparator comparator, boolean reversed, int lastIndex)
     {
-        IndexInfo target = new IndexInfo(name, name, 0, 0, null);
+        IndexInfo target = new IndexInfo(name, name, 0, null);
         /*
         Take the example from the unit test, and say your index looks like this:
         [0..5][10..15][20..25]
@@ -115,12 +87,11 @@ public class IndexHelper
 
     public static class IndexInfo
     {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null));
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, null));
 
         public final long width;
-        public final ClusteringPrefix lastName;
         public final ClusteringPrefix firstName;
-        public final long offset;
+        public final ClusteringPrefix lastName;
 
         // If at the end of the index block there is an open range tombstone marker, this marker
         // deletion infos. null otherwise.
@@ -128,73 +99,77 @@ public class IndexHelper
 
         public IndexInfo(ClusteringPrefix firstName,
                          ClusteringPrefix lastName,
-                         long offset,
                          long width,
                          DeletionTime endOpenMarker)
         {
             this.firstName = firstName;
             this.lastName = lastName;
-            this.offset = offset;
             this.width = width;
             this.endOpenMarker = endOpenMarker;
         }
 
         public static class Serializer
         {
-            private final CFMetaData metadata;
+            // This is the default index size that we use to delta-encode width when serializing so we get better vint-encoding.
+            // This is imperfect as user can change the index size and ideally we would save the index size used with each index file
+            // to use as base. However, that's a bit more involved a change that we want for now and very seldom do use change the index
+            // size so using the default is almost surely better than using no base at all.
+            private static final long WIDTH_BASE = 64 * 1024;
+
+            // TODO: Only public for use in RowIndexEntry for backward compatibility code. Can be made private once backward compatibility is dropped.
+            public final ISerializer<ClusteringPrefix> clusteringSerializer;
             private final Version version;
 
-            public Serializer(CFMetaData metadata, Version version)
+            public Serializer(CFMetaData metadata, Version version, SerializationHeader header)
             {
-                this.metadata = metadata;
+                this.clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
                 this.version = version;
             }
 
-            public void serialize(IndexInfo info, DataOutputPlus out, SerializationHeader header) throws IOException
+            public void serialize(IndexInfo info, DataOutputPlus out) throws IOException
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
+                assert version.storeRows() : "We read old index files but we should never write them";
+
                 clusteringSerializer.serialize(info.firstName, out);
                 clusteringSerializer.serialize(info.lastName, out);
-                out.writeLong(info.offset);
-                out.writeLong(info.width);
+                out.writeVInt(info.width - WIDTH_BASE);
 
-                if (version.storeRows())
-                {
-                    out.writeBoolean(info.endOpenMarker != null);
-                    if (info.endOpenMarker != null)
-                        DeletionTime.serializer.serialize(info.endOpenMarker, out);
-                }
+                out.writeBoolean(info.endOpenMarker != null);
+                if (info.endOpenMarker != null)
+                    DeletionTime.serializer.serialize(info.endOpenMarker, out);
             }
 
-            public IndexInfo deserialize(DataInputPlus in, SerializationHeader header) throws IOException
+            public IndexInfo deserialize(DataInputPlus in) throws IOException
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
-
                 ClusteringPrefix firstName = clusteringSerializer.deserialize(in);
                 ClusteringPrefix lastName = clusteringSerializer.deserialize(in);
-                long offset = in.readLong();
-                long width = in.readLong();
-                DeletionTime endOpenMarker = version.storeRows() && in.readBoolean()
-                                           ? DeletionTime.serializer.deserialize(in)
-                                           : null;
-
-                return new IndexInfo(firstName, lastName, offset, width, endOpenMarker);
+                long width;
+                DeletionTime endOpenMarker = null;
+                if (version.storeRows())
+                {
+                    width = in.readVInt() + WIDTH_BASE;
+                    if (in.readBoolean())
+                        endOpenMarker = DeletionTime.serializer.deserialize(in);
+                }
+                else
+                {
+                    in.readLong(); // skip offset
+                    width = in.readLong();
+                }
+                return new IndexInfo(firstName, lastName, width, endOpenMarker);
             }
 
-            public long serializedSize(IndexInfo info, SerializationHeader header)
+            public long serializedSize(IndexInfo info)
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
+                assert version.storeRows() : "We read old index files but we should never write them";
+
                 long size = clusteringSerializer.serializedSize(info.firstName)
                           + clusteringSerializer.serializedSize(info.lastName)
-                          + TypeSizes.sizeof(info.offset)
-                          + TypeSizes.sizeof(info.width);
+                          + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
+                          + TypeSizes.sizeof(info.endOpenMarker != null);
 
-                if (version.storeRows())
-                {
-                    size += TypeSizes.sizeof(info.endOpenMarker != null);
-                    if (info.endOpenMarker != null)
-                        size += DeletionTime.serializer.serializedSize(info.endOpenMarker);
-                }
+                if (info.endOpenMarker != null)
+                    size += DeletionTime.serializer.serializedSize(info.endOpenMarker);
                 return size;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 6f1e2f4..f02b9d1 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.DataInput;
 import java.io.File;
 import java.io.IOException;
 
@@ -27,6 +26,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -49,7 +49,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
                 in = RandomAccessReader.open(path);
         }
 
-        public DataInput get()
+        public DataInputPlus get()
         {
             maybeInit();
             return in;
@@ -80,12 +80,14 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
         }
     }
 
+    private final Descriptor desc;
     private final In in;
     private final IPartitioner partitioner;
 
 
     public KeyIterator(Descriptor desc, CFMetaData metadata)
     {
+        this.desc = desc;
         in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
         partitioner = metadata.partitioner;
     }
@@ -98,7 +100,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
                 return endOfData();
 
             DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
-            RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry
+            RowIndexEntry.Serializer.skip(in.get(), desc.version); // skip remainder of the entry
             return key;
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index b86d9b4..63b8f3e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -237,7 +237,7 @@ public abstract class SSTable
         while (ifile.getFilePointer() < BYTES_CAP && keys < SAMPLES_CAP)
         {
             ByteBufferUtil.skipShortLength(ifile);
-            RowIndexEntry.Serializer.skip(ifile);
+            RowIndexEntry.Serializer.skip(ifile, descriptor.version);
             keys++;
         }
         assert keys > 0 && ifile.getFilePointer() > 0 && ifile.length() > 0 : "Unexpected empty index file: " + ifile;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 5d8ab50..b958240 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -922,7 +922,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
                         return true;
                 }
-                RowIndexEntry.Serializer.skip(in);
+                RowIndexEntry.Serializer.skip(in, descriptor.version);
                 i++;
             }
         }
@@ -1199,7 +1199,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
                     summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
-                    RowIndexEntry.Serializer.skip(primaryIndex);
+                    RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version);
                 }
 
                 return summaryBuilder.build(getPartitioner());
@@ -1605,7 +1605,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 if (indexDecoratedKey.compareTo(token) > 0)
                     return indexDecoratedKey;
 
-                RowIndexEntry.Serializer.skip(in);
+                RowIndexEntry.Serializer.skip(in, descriptor.version);
             }
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 4b66942..efd1057 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -239,7 +239,7 @@ public class BigTableReader extends SSTableReader
                     return indexEntry;
                 }
 
-                RowIndexEntry.Serializer.skip(in);
+                RowIndexEntry.Serializer.skip(in, descriptor.version);
             }
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index d135df0..1a4ac21 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -176,14 +176,14 @@ public class BigTableScanner implements ISSTableScanner
                 if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey))
                 {
                     // Found, just read the dataPosition and seek into index and data files
-                    long dataPosition = ifile.readLong();
+                    long dataPosition = RowIndexEntry.Serializer.readPosition(ifile, sstable.descriptor.version);
                     ifile.seek(indexPosition);
                     dfile.seek(dataPosition);
                     break;
                 }
                 else
                 {
-                    RowIndexEntry.Serializer.skip(ifile);
+                    RowIndexEntry.Serializer.skip(ifile, sstable.descriptor.version);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 9213b20..a48466a 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -469,7 +470,11 @@ public class CacheService implements CacheServiceMBean
             input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
             if (reader == null)
             {
-                RowIndexEntry.Serializer.skipPromotedIndex(input);
+                // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be
+                // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that
+                // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this
+                // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
+                RowIndexEntry.Serializer.skipPromotedIndex(input, BigFormat.instance.getLatestVersion());
                 return null;
             }
             RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 4b7f15a..9f80023 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -132,10 +132,10 @@ public class SinglePartitionSliceCommandTest
     @Test
     public void staticColumnsAreReturned() throws IOException
     {
-        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k"));
+        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1"));
 
-        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k', 's')");
-        Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k'").isEmpty());
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')");
+        Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty());
 
         ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
         ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
@@ -147,11 +147,11 @@ public class SinglePartitionSliceCommandTest
                                                           key,
                                                           sliceFilter);
 
-        UnfilteredPartitionIterator pi;
-
         // check raw iterator for static cell
-        pi = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        checkForS(pi);
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+        {
+            checkForS(pi);
+        }
 
         ReadResponse response;
         DataOutputBuffer out;
@@ -159,24 +159,33 @@ public class SinglePartitionSliceCommandTest
         ReadResponse dst;
 
         // check (de)serialized iterator for memtable static cell
-        pi = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+        {
+            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        }
+
         out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
-        pi = dst.makeIterator(cfm, cmd);
-        checkForS(pi);
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        {
+            checkForS(pi);
+        }
 
         // check (de)serialized iterator for sstable static cell
         Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
-        pi = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+        {
+            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        }
         out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
-        pi = dst.makeIterator(cfm, cmd);
-        checkForS(pi);
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        {
+            checkForS(pi);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
index c9f268a..2c967d0 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
@@ -51,9 +51,9 @@ public class IndexHelperTest
         DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
 
         List<IndexInfo> indexes = new ArrayList<>();
-        indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo));
-        indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo));
-        indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo));
+        indexes.add(new IndexInfo(cn(0L), cn(5L), 0, deletionInfo));
+        indexes.add(new IndexInfo(cn(10L), cn(15L), 0,deletionInfo));
+        indexes.add(new IndexInfo(cn(20L), cn(25L), 0, deletionInfo));
 
 
         assertEquals(0, IndexHelper.indexFor(cn(-1L), indexes, comp, false, -1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 4eebdeb..faa9c3e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -167,7 +167,9 @@ public class SSTableLoaderTest
                                                   .withBufferSizeInMB(1)
                                                   .build();
 
-        for (int i = 0; i < 1000; i++) // make sure to write more than 1 MB
+        int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
+
+        for (int i = 0; i < NB_PARTITIONS; i++)
         {
             for (int j = 0; j < 100; j++)
                 writer.addRow(String.format("key%d", i), String.format("col%d", j), "100");
@@ -183,7 +185,7 @@ public class SSTableLoaderTest
 
         List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
 
-        assertTrue(partitions.size() > 0 && partitions.size() < 1000);
+        assertTrue(partitions.size() > 0 && partitions.size() < NB_PARTITIONS);
 
         // now we complete the write and the second loader should load the last sstable as well
         writer.close();
@@ -192,7 +194,7 @@ public class SSTableLoaderTest
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
-        assertEquals(1000, partitions.size());
+        assertEquals(NB_PARTITIONS, partitions.size());
 
         // The stream future is signalled when the work is complete but before releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).