You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/09/23 23:39:07 UTC

[1/2] cassandra git commit: Write row size in sstable format for faster skipping

Repository: cassandra
Updated Branches:
  refs/heads/10378 [created] 525855d2f


Write row size in sstable format for faster skipping


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/424b59ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/424b59ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/424b59ad

Branch: refs/heads/10378
Commit: 424b59ad5aa72b25eab8995a2c248ab734d33177
Parents: 41731b8
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Sep 22 13:53:22 2015 -0700
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Sep 22 14:04:06 2015 -0700

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Memtable.java  |  2 +-
 .../cassandra/db/SerializationHeader.java       | 26 ++++--
 .../rows/UnfilteredRowIteratorSerializer.java   |  8 +-
 .../cassandra/db/rows/UnfilteredSerializer.java | 90 +++++++++-----------
 .../io/sstable/AbstractSSTableSimpleWriter.java |  2 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |  2 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |  4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  3 +-
 .../db/compaction/AntiCompactionTest.java       |  2 +-
 .../io/sstable/BigTableWriterTest.java          |  2 +-
 .../io/sstable/SSTableRewriterTest.java         |  4 +-
 .../cassandra/io/sstable/SSTableUtils.java      |  2 +-
 12 files changed, 78 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 7af65d1..ae982d3 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -428,7 +428,7 @@ public class Memtable implements Comparable<Memtable>
                                                                      (long)partitions.size(),
                                                                      ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                      sstableMetadataCollector,
-                                                                     new SerializationHeader(cfs.metadata, columns, stats),
+                                                                     new SerializationHeader(true, cfs.metadata, columns, stats),
                                                                      txn));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index decac49..0706d06 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -45,6 +45,8 @@ public class SerializationHeader
 {
     public static final Serializer serializer = new Serializer();
 
+    private final boolean isForSSTable;
+
     private final AbstractType<?> keyType;
     private final List<AbstractType<?>> clusteringTypes;
 
@@ -53,12 +55,14 @@ public class SerializationHeader
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
 
-    private SerializationHeader(AbstractType<?> keyType,
+    private SerializationHeader(boolean isForSSTable,
+                                AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
                                 PartitionColumns columns,
                                 EncodingStats stats,
                                 Map<ByteBuffer, AbstractType<?>> typeMap)
     {
+        this.isForSSTable = isForSSTable;
         this.keyType = keyType;
         this.clusteringTypes = clusteringTypes;
         this.columns = columns;
@@ -77,7 +81,8 @@ public class SerializationHeader
         List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
         for (int i = 0; i < size; i++)
             clusteringTypes.add(BytesType.instance);
-        return new SerializationHeader(BytesType.instance,
+        return new SerializationHeader(false,
+                                       BytesType.instance,
                                        clusteringTypes,
                                        PartitionColumns.NONE,
                                        EncodingStats.NO_STATS,
@@ -108,14 +113,16 @@ public class SerializationHeader
             else
                 columns.addAll(sstable.header.columns());
         }
-        return new SerializationHeader(metadata, columns.build(), stats.get());
+        return new SerializationHeader(true, metadata, columns.build(), stats.get());
     }
 
-    public SerializationHeader(CFMetaData metadata,
+    public SerializationHeader(boolean isForSSTable,
+                               CFMetaData metadata,
                                PartitionColumns columns,
                                EncodingStats stats)
     {
-        this(metadata.getKeyValidator(),
+        this(isForSSTable,
+             metadata.getKeyValidator(),
              typesOf(metadata.clusteringColumns()),
              columns,
              stats,
@@ -137,6 +144,11 @@ public class SerializationHeader
         return !columns.statics.isEmpty();
     }
 
+    public boolean isForSSTable()
+    {
+        return isForSSTable;
+    }
+
     public EncodingStats stats()
     {
         return stats;
@@ -320,7 +332,7 @@ public class SerializationHeader
                 }
                 builder.add(column);
             }
-            return new SerializationHeader(keyType, clusteringTypes, builder.build(), stats, typeMap);
+            return new SerializationHeader(true, keyType, clusteringTypes, builder.build(), stats, typeMap);
         }
 
         @Override
@@ -390,7 +402,7 @@ public class SerializationHeader
                 regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in);
             }
 
-            return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
+            return new SerializationHeader(false, keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
         }
 
         public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection, boolean hasStatic)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index df006d7..3c5cdbf 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -80,7 +80,8 @@ public class UnfilteredRowIteratorSerializer
     // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
-        SerializationHeader header = new SerializationHeader(iterator.metadata(),
+        SerializationHeader header = new SerializationHeader(false,
+                                                             iterator.metadata(),
                                                              iterator.columns(),
                                                              iterator.stats());
         serialize(iterator, header, selection, out, version, rowEstimate);
@@ -134,7 +135,8 @@ public class UnfilteredRowIteratorSerializer
     // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
     public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate)
     {
-        SerializationHeader header = new SerializationHeader(iterator.metadata(),
+        SerializationHeader header = new SerializationHeader(false,
+                                                             iterator.metadata(),
                                                              iterator.columns(),
                                                              iterator.stats());
 
@@ -175,7 +177,7 @@ public class UnfilteredRowIteratorSerializer
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)
         {
-            SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
+            SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
             return new Header(sh, key, isReversed, true, null, null, 0);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index b83ccf9..1f77529 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -35,10 +35,12 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  *       flag is defined/explained below as the "Unfiltered flags" constants. One of those flags
  *       is an extension flag, and if present, trigger the rid of another byte that contains more
  *       flags. If the extension is not set, defaults are assumed for the flags of that 2nd byte.
- *   <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
- *       <clustering> is the row clustering as serialized by
- *       {@code Clustering.serializer}. Note that static row are an exception and
- *       don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
+ *   <row> is <clustering><size>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
+ *       <clustering> is the row clustering as serialized by {@code Clustering.serializer} (note
+ *       that static row are an exception and don't have this).
+ *       <size> is the size of the whole unfiltered on disk (it's only used for sstables and is
+ *       used to efficiently skip rows).
+ *       <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
  *       whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
  *       complex ones.
  *       The columns for the row are then serialized if they differ from those in the header,
@@ -148,6 +150,9 @@ public class UnfilteredSerializer
         if (!isStatic)
             Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
 
+        if (header.isForSSTable())
+            out.writeUnsignedVInt(serializedRowBodySize(row, header, version));
+
         if ((flags & HAS_TIMESTAMP) != 0)
             header.writeTimestamp(pkLiveness.timestamp(), out);
         if ((flags & HAS_TTL) != 0)
@@ -187,6 +192,9 @@ public class UnfilteredSerializer
         out.writeByte((byte)IS_MARKER);
         RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
 
+        if (header.isForSSTable())
+            out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, version));
+
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -210,6 +218,19 @@ public class UnfilteredSerializer
     {
         long size = 1; // flags
 
+        if (row.isStatic() || row.deletion().isShadowable())
+            size += 1; // extended flags
+
+        if (!row.isStatic())
+            size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
+
+        return size + serializedRowBodySize(row, header, version);
+    }
+
+    public long serializedRowBodySize(Row row, SerializationHeader header, int version)
+    {
+        long size = 0;
+
         boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -217,11 +238,6 @@ public class UnfilteredSerializer
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
 
-        if (isStatic || deletion.isShadowable())
-            size += 1; // extended flags
-
-        if (!isStatic)
-            size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
 
         if (!pkLiveness.isEmpty())
             size += header.timestampSerializedSize(pkLiveness.timestamp());
@@ -263,9 +279,14 @@ public class UnfilteredSerializer
 
     public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
     {
-        long size = 1 // flags
-                  + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes());
+        return 1 // flags
+             + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes())
+             + serializedMarkerBodySize(marker, header, version);
+    }
 
+    public long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, int version)
+    {
+        long size = 0;
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -328,6 +349,9 @@ public class UnfilteredSerializer
     public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound)
     throws IOException
     {
+            if (header.isForSSTable())
+                in.readUnsignedVInt(); // Skip marker size
+
         if (bound.isBoundary())
             return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
         else
@@ -344,6 +368,9 @@ public class UnfilteredSerializer
     {
         try
         {
+            if (header.isForSSTable())
+                in.readUnsignedVInt(); // Skip row size
+
             boolean isStatic = isStatic(extendedFlags);
             boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
             boolean hasTTL = (flags & HAS_TTL) != 0;
@@ -432,34 +459,8 @@ public class UnfilteredSerializer
 
     public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags, int extendedFlags) throws IOException
     {
-        boolean isStatic = isStatic(extendedFlags);
-        boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
-        boolean hasTTL = (flags & HAS_TTL) != 0;
-        boolean hasDeletion = (flags & HAS_DELETION) != 0;
-        boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
-        boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
-        Columns headerColumns = header.columns(isStatic);
-
-        // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
-        // the size we think due to VINT encoding
-        if (hasTimestamp)
-            header.skipTimestamp(in);
-        if (hasTTL)
-        {
-            header.skipLocalDeletionTime(in);
-            header.skipTTL(in);
-        }
-        if (hasDeletion)
-            header.skipDeletionTime(in);
-
-        Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
-        for (ColumnDefinition column : columns)
-        {
-            if (column.isSimple())
-                Cell.serializer.skip(in, column, header);
-            else
-                skipComplexColumn(in, column, header, hasComplexDeletion);
-        }
+        int rowSize = (int)in.readUnsignedVInt();
+        in.skipBytesFully(rowSize);
     }
 
     public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException
@@ -473,15 +474,8 @@ public class UnfilteredSerializer
 
     public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary) throws IOException
     {
-        if (isBoundary)
-        {
-            header.skipDeletionTime(in);
-            header.skipDeletionTime(in);
-        }
-        else
-        {
-            header.skipDeletionTime(in);
-        }
+        int markerSize = (int)in.readUnsignedVInt();
+        in.skipBytesFully(markerSize);
     }
 
     private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, SerializationHeader header, boolean hasComplexDeletion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index d94b219..62348ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -65,7 +65,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
-                                       new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+                                       new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS));
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index f4b9adf..7dd4257 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -63,7 +63,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     {
         super(directory, metadata, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
+        this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 25baa4e..62c88a0 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -60,7 +60,7 @@ public class RowIndexEntryTest extends CQLTester
 
         DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
 
-        SerializationHeader header = new SerializationHeader(cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS);
         IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfMeta, BigFormat.latestVersion, header);
 
         DataOutputBuffer dob = new DataOutputBuffer();
@@ -119,7 +119,7 @@ public class RowIndexEntryTest extends CQLTester
         final RowIndexEntry simple = new RowIndexEntry(123);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
-        SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
         RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header);
 
         serializer.serialize(simple, buffer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 2fc8436..ab99750 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -325,7 +325,8 @@ public class ScrubTest
                                                                    keys.size(),
                                                                    0L,
                                                                    0,
-                                                                   new SerializationHeader(cfs.metadata,
+                                                                   new SerializationHeader(true,
+                                                                                           cfs.metadata,
                                                                                            cfs.metadata.partitionColumns(),
                                                                                            EncodingStats.NO_STATS)))
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index cd82b19..db07eb8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -166,7 +166,7 @@ public class AntiCompactionTest
         File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         String filename = cfs.getSSTablePath(dir);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true, cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index e1ab48f..78964f4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW(String file)
         {
-            this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+            this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(String file, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 942c7f9..097291e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -943,7 +943,7 @@ public class SSTableRewriterTest extends SchemaLoader
             File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             String filename = cfs.getSSTablePath(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
                 for ( ; i < end ; i++)
@@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader
     public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
     {
         String filename = cfs.getSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
     }
 
     public static ByteBuffer random(int i, int size)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index fcd2d71..5c7ff02 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -187,7 +187,7 @@ public class SSTableUtils
             {
                 public SerializationHeader header()
                 {
-                    return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
+                    return new SerializationHeader(true, Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
                 }
 
                 @Override


[2/2] cassandra git commit: Record previous row size

Posted by sl...@apache.org.
Record previous row size


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/525855d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/525855d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/525855d2

Branch: refs/heads/10378
Commit: 525855d2f37b2fe9376b4ce2dab9107d0d227f6a
Parents: 424b59a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Sep 23 14:36:04 2015 -0700
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Sep 23 14:36:04 2015 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnIndex.java    | 10 ++-
 .../rows/UnfilteredRowIteratorSerializer.java   |  2 +
 .../cassandra/db/rows/UnfilteredSerializer.java | 69 +++++++++++++++-----
 3 files changed, 60 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index add5fa7..ede3f79 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -76,6 +76,7 @@ public class ColumnIndex
         private long startPosition = -1;
 
         private int written;
+        private long previousRowStart;
 
         private ClusteringPrefix firstClustering;
         private ClusteringPrefix lastClustering;
@@ -99,7 +100,7 @@ public class ColumnIndex
             ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
             DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
             if (header.hasStatic())
-                UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, version);
+                UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version);
         }
 
         public ColumnIndex build() throws IOException
@@ -131,15 +132,18 @@ public class ColumnIndex
 
         private void add(Unfiltered unfiltered) throws IOException
         {
+            long pos = currentPosition();
+
             if (firstClustering == null)
             {
                 // Beginning of an index block. Remember the start and position
                 firstClustering = unfiltered.clustering();
-                startPosition = currentPosition();
+                startPosition = pos;
             }
 
-            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
+            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
             lastClustering = unfiltered.clustering();
+            previousRowStart = pos;
             ++written;
 
             if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 3c5cdbf..3a0558e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -90,6 +90,8 @@ public class UnfilteredRowIteratorSerializer
     // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
+        assert !header.isForSSTable();
+
         ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
 
         int flags = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 1f77529..fac8863 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -92,17 +92,31 @@ public class UnfilteredSerializer
     public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
     throws IOException
     {
+        assert !header.isForSSTable();
+        serialize(unfiltered, header, out, 0, version);
+    }
+
+    public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version)
+    throws IOException
+    {
         if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
-            serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+            serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize, version);
         }
         else
         {
-            serialize((Row) unfiltered, header, out, version);
+            serialize((Row) unfiltered, header, out, previousUnfilteredSize, version);
         }
     }
 
-    public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version)
+    public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out, int version)
+    throws IOException
+    {
+        assert row.isStatic();
+        serialize(row, header, out, 0, version);
+    }
+
+    private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version)
     throws IOException
     {
         int flags = 0;
@@ -151,7 +165,10 @@ public class UnfilteredSerializer
             Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
 
         if (header.isForSSTable())
-            out.writeUnsignedVInt(serializedRowBodySize(row, header, version));
+        {
+            out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version));
+            out.writeUnsignedVInt(previousUnfilteredSize);
+        }
 
         if ((flags & HAS_TIMESTAMP) != 0)
             header.writeTimestamp(pkLiveness.timestamp(), out);
@@ -186,14 +203,17 @@ public class UnfilteredSerializer
             Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
-    public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
+    private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version)
     throws IOException
     {
         out.writeByte((byte)IS_MARKER);
         RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
 
         if (header.isForSSTable())
-            out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, version));
+        {
+            out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, previousUnfilteredSize, version));
+            out.writeUnsignedVInt(previousUnfilteredSize);
+        }
 
         if (marker.isBoundary())
         {
@@ -214,8 +234,9 @@ public class UnfilteredSerializer
              : serializedSize((Row) unfiltered, header, version);
     }
 
-    public long serializedSize(Row row, SerializationHeader header, int version)
+    private long serializedSize(Row row, SerializationHeader header, int version)
     {
+        assert !header.isForSSTable();
         long size = 1; // flags
 
         if (row.isStatic() || row.deletion().isShadowable())
@@ -224,13 +245,16 @@ public class UnfilteredSerializer
         if (!row.isStatic())
             size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
 
-        return size + serializedRowBodySize(row, header, version);
+        return size + serializedRowBodySize(row, header, 0, version);
     }
 
-    public long serializedRowBodySize(Row row, SerializationHeader header, int version)
+    private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize, int version)
     {
         long size = 0;
 
+        if (header.isForSSTable())
+            size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
         boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -238,7 +262,6 @@ public class UnfilteredSerializer
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
 
-
         if (!pkLiveness.isEmpty())
             size += header.timestampSerializedSize(pkLiveness.timestamp());
         if (pkLiveness.isExpiring())
@@ -277,16 +300,20 @@ public class UnfilteredSerializer
         return size;
     }
 
-    public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
+    private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
     {
+        assert !header.isForSSTable();
         return 1 // flags
              + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes())
-             + serializedMarkerBodySize(marker, header, version);
+             + serializedMarkerBodySize(marker, header, 0, version);
     }
 
-    public long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, int version)
+    private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version)
     {
         long size = 0;
+        if (header.isForSSTable())
+            size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -349,8 +376,11 @@ public class UnfilteredSerializer
     public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound)
     throws IOException
     {
-            if (header.isForSSTable())
-                in.readUnsignedVInt(); // Skip marker size
+        if (header.isForSSTable())
+        {
+            in.readUnsignedVInt(); // marker size
+            in.readUnsignedVInt(); // previous unfiltered size
+        }
 
         if (bound.isBoundary())
             return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
@@ -368,9 +398,6 @@ public class UnfilteredSerializer
     {
         try
         {
-            if (header.isForSSTable())
-                in.readUnsignedVInt(); // Skip row size
-
             boolean isStatic = isStatic(extendedFlags);
             boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
             boolean hasTTL = (flags & HAS_TTL) != 0;
@@ -380,6 +407,12 @@ public class UnfilteredSerializer
             boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
             Columns headerColumns = header.columns(isStatic);
 
+            if (header.isForSSTable())
+            {
+                in.readUnsignedVInt(); // Skip row size
+                in.readUnsignedVInt(); // previous unfiltered size
+            }
+
             LivenessInfo rowLiveness = LivenessInfo.EMPTY;
             if (hasTimestamp)
             {