You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/09/23 23:39:07 UTC
[1/2] cassandra git commit: Write row size in sstable format for
faster skipping
Repository: cassandra
Updated Branches:
refs/heads/10378 [created] 525855d2f
Write row size in sstable format for faster skipping
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/424b59ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/424b59ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/424b59ad
Branch: refs/heads/10378
Commit: 424b59ad5aa72b25eab8995a2c248ab734d33177
Parents: 41731b8
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Sep 22 13:53:22 2015 -0700
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Sep 22 14:04:06 2015 -0700
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Memtable.java | 2 +-
.../cassandra/db/SerializationHeader.java | 26 ++++--
.../rows/UnfilteredRowIteratorSerializer.java | 8 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 90 +++++++++-----------
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 2 +-
.../apache/cassandra/db/RowIndexEntryTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 3 +-
.../db/compaction/AntiCompactionTest.java | 2 +-
.../io/sstable/BigTableWriterTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 4 +-
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
12 files changed, 78 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 7af65d1..ae982d3 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -428,7 +428,7 @@ public class Memtable implements Comparable<Memtable>
(long)partitions.size(),
ActiveRepairService.UNREPAIRED_SSTABLE,
sstableMetadataCollector,
- new SerializationHeader(cfs.metadata, columns, stats),
+ new SerializationHeader(true, cfs.metadata, columns, stats),
txn));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index decac49..0706d06 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -45,6 +45,8 @@ public class SerializationHeader
{
public static final Serializer serializer = new Serializer();
+ private final boolean isForSSTable;
+
private final AbstractType<?> keyType;
private final List<AbstractType<?>> clusteringTypes;
@@ -53,12 +55,14 @@ public class SerializationHeader
private final Map<ByteBuffer, AbstractType<?>> typeMap;
- private SerializationHeader(AbstractType<?> keyType,
+ private SerializationHeader(boolean isForSSTable,
+ AbstractType<?> keyType,
List<AbstractType<?>> clusteringTypes,
PartitionColumns columns,
EncodingStats stats,
Map<ByteBuffer, AbstractType<?>> typeMap)
{
+ this.isForSSTable = isForSSTable;
this.keyType = keyType;
this.clusteringTypes = clusteringTypes;
this.columns = columns;
@@ -77,7 +81,8 @@ public class SerializationHeader
List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
for (int i = 0; i < size; i++)
clusteringTypes.add(BytesType.instance);
- return new SerializationHeader(BytesType.instance,
+ return new SerializationHeader(false,
+ BytesType.instance,
clusteringTypes,
PartitionColumns.NONE,
EncodingStats.NO_STATS,
@@ -108,14 +113,16 @@ public class SerializationHeader
else
columns.addAll(sstable.header.columns());
}
- return new SerializationHeader(metadata, columns.build(), stats.get());
+ return new SerializationHeader(true, metadata, columns.build(), stats.get());
}
- public SerializationHeader(CFMetaData metadata,
+ public SerializationHeader(boolean isForSSTable,
+ CFMetaData metadata,
PartitionColumns columns,
EncodingStats stats)
{
- this(metadata.getKeyValidator(),
+ this(isForSSTable,
+ metadata.getKeyValidator(),
typesOf(metadata.clusteringColumns()),
columns,
stats,
@@ -137,6 +144,11 @@ public class SerializationHeader
return !columns.statics.isEmpty();
}
+ public boolean isForSSTable()
+ {
+ return isForSSTable;
+ }
+
public EncodingStats stats()
{
return stats;
@@ -320,7 +332,7 @@ public class SerializationHeader
}
builder.add(column);
}
- return new SerializationHeader(keyType, clusteringTypes, builder.build(), stats, typeMap);
+ return new SerializationHeader(true, keyType, clusteringTypes, builder.build(), stats, typeMap);
}
@Override
@@ -390,7 +402,7 @@ public class SerializationHeader
regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in);
}
- return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
+ return new SerializationHeader(false, keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
}
public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection, boolean hasStatic)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index df006d7..3c5cdbf 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -80,7 +80,8 @@ public class UnfilteredRowIteratorSerializer
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
- SerializationHeader header = new SerializationHeader(iterator.metadata(),
+ SerializationHeader header = new SerializationHeader(false,
+ iterator.metadata(),
iterator.columns(),
iterator.stats());
serialize(iterator, header, selection, out, version, rowEstimate);
@@ -134,7 +135,8 @@ public class UnfilteredRowIteratorSerializer
// recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate)
{
- SerializationHeader header = new SerializationHeader(iterator.metadata(),
+ SerializationHeader header = new SerializationHeader(false,
+ iterator.metadata(),
iterator.columns(),
iterator.stats());
@@ -175,7 +177,7 @@ public class UnfilteredRowIteratorSerializer
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
{
- SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
+ SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
return new Header(sh, key, isReversed, true, null, null, 0);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index b83ccf9..1f77529 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -35,10 +35,12 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* flag is defined/explained below as the "Unfiltered flags" constants. One of those flags
* is an extension flag, and if present, trigger the rid of another byte that contains more
* flags. If the extension is not set, defaults are assumed for the flags of that 2nd byte.
- * <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
- * <clustering> is the row clustering as serialized by
- * {@code Clustering.serializer}. Note that static row are an exception and
- * don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
+ * <row> is <clustering><size>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
+ * <clustering> is the row clustering as serialized by {@code Clustering.serializer} (note
+ * that static row are an exception and don't have this).
+ * <size> is the size of the whole unfiltered on disk (it's only used for sstables and is
+ * used to efficiently skip rows).
+ * <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
* whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
* complex ones.
* The columns for the row are then serialized if they differ from those in the header,
@@ -148,6 +150,9 @@ public class UnfilteredSerializer
if (!isStatic)
Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
+ if (header.isForSSTable())
+ out.writeUnsignedVInt(serializedRowBodySize(row, header, version));
+
if ((flags & HAS_TIMESTAMP) != 0)
header.writeTimestamp(pkLiveness.timestamp(), out);
if ((flags & HAS_TTL) != 0)
@@ -187,6 +192,9 @@ public class UnfilteredSerializer
out.writeByte((byte)IS_MARKER);
RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
+ if (header.isForSSTable())
+ out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, version));
+
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -210,6 +218,19 @@ public class UnfilteredSerializer
{
long size = 1; // flags
+ if (row.isStatic() || row.deletion().isShadowable())
+ size += 1; // extended flags
+
+ if (!row.isStatic())
+ size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
+
+ return size + serializedRowBodySize(row, header, version);
+ }
+
+ public long serializedRowBodySize(Row row, SerializationHeader header, int version)
+ {
+ long size = 0;
+
boolean isStatic = row.isStatic();
Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -217,11 +238,6 @@ public class UnfilteredSerializer
boolean hasComplexDeletion = row.hasComplexDeletion();
boolean hasAllColumns = (row.size() == headerColumns.size());
- if (isStatic || deletion.isShadowable())
- size += 1; // extended flags
-
- if (!isStatic)
- size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
@@ -263,9 +279,14 @@ public class UnfilteredSerializer
public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
{
- long size = 1 // flags
- + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes());
+ return 1 // flags
+ + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes())
+ + serializedMarkerBodySize(marker, header, version);
+ }
+ public long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, int version)
+ {
+ long size = 0;
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -328,6 +349,9 @@ public class UnfilteredSerializer
public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound)
throws IOException
{
+ if (header.isForSSTable())
+ in.readUnsignedVInt(); // Skip marker size
+
if (bound.isBoundary())
return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
else
@@ -344,6 +368,9 @@ public class UnfilteredSerializer
{
try
{
+ if (header.isForSSTable())
+ in.readUnsignedVInt(); // Skip row size
+
boolean isStatic = isStatic(extendedFlags);
boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
boolean hasTTL = (flags & HAS_TTL) != 0;
@@ -432,34 +459,8 @@ public class UnfilteredSerializer
public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags, int extendedFlags) throws IOException
{
- boolean isStatic = isStatic(extendedFlags);
- boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
- boolean hasTTL = (flags & HAS_TTL) != 0;
- boolean hasDeletion = (flags & HAS_DELETION) != 0;
- boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
- boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
- Columns headerColumns = header.columns(isStatic);
-
- // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
- // the size we think due to VINT encoding
- if (hasTimestamp)
- header.skipTimestamp(in);
- if (hasTTL)
- {
- header.skipLocalDeletionTime(in);
- header.skipTTL(in);
- }
- if (hasDeletion)
- header.skipDeletionTime(in);
-
- Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
- for (ColumnDefinition column : columns)
- {
- if (column.isSimple())
- Cell.serializer.skip(in, column, header);
- else
- skipComplexColumn(in, column, header, hasComplexDeletion);
- }
+ int rowSize = (int)in.readUnsignedVInt();
+ in.skipBytesFully(rowSize);
}
public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException
@@ -473,15 +474,8 @@ public class UnfilteredSerializer
public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary) throws IOException
{
- if (isBoundary)
- {
- header.skipDeletionTime(in);
- header.skipDeletionTime(in);
- }
- else
- {
- header.skipDeletionTime(in);
- }
+ int markerSize = (int)in.readUnsignedVInt();
+ in.skipBytesFully(markerSize);
}
private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, SerializationHeader header, boolean hasComplexDeletion)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index d94b219..62348ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -65,7 +65,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
0,
- new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+ new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS));
}
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index f4b9adf..7dd4257 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -63,7 +63,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
{
super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
- this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
+ this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
diskWriter.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 25baa4e..62c88a0 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -60,7 +60,7 @@ public class RowIndexEntryTest extends CQLTester
DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
- SerializationHeader header = new SerializationHeader(cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS);
+ SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS);
IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfMeta, BigFormat.latestVersion, header);
DataOutputBuffer dob = new DataOutputBuffer();
@@ -119,7 +119,7 @@ public class RowIndexEntryTest extends CQLTester
final RowIndexEntry simple = new RowIndexEntry(123);
DataOutputBuffer buffer = new DataOutputBuffer();
- SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
+ SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header);
serializer.serialize(simple, buffer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 2fc8436..ab99750 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -325,7 +325,8 @@ public class ScrubTest
keys.size(),
0L,
0,
- new SerializationHeader(cfs.metadata,
+ new SerializationHeader(true,
+ cfs.metadata,
cfs.metadata.partitionColumns(),
EncodingStats.NO_STATS)))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index cd82b19..db07eb8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -166,7 +166,7 @@ public class AntiCompactionTest
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
String filename = cfs.getSSTablePath(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true, cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
{
for (int i = 0; i < count; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index e1ab48f..78964f4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
private TestableBTW(String file)
{
- this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+ this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
}
private TestableBTW(String file, SSTableTxnWriter sw)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 942c7f9..097291e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -943,7 +943,7 @@ public class SSTableRewriterTest extends SchemaLoader
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
String filename = cfs.getSSTablePath(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
{
int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
for ( ; i < end ; i++)
@@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
{
String filename = cfs.getSSTablePath(directory);
- return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+ return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
}
public static ByteBuffer random(int i, int size)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/424b59ad/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index fcd2d71..5c7ff02 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -187,7 +187,7 @@ public class SSTableUtils
{
public SerializationHeader header()
{
- return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
+ return new SerializationHeader(true, Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
}
@Override
[2/2] cassandra git commit: Record previous row size
Posted by sl...@apache.org.
Record previous row size
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/525855d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/525855d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/525855d2
Branch: refs/heads/10378
Commit: 525855d2f37b2fe9376b4ce2dab9107d0d227f6a
Parents: 424b59a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Sep 23 14:36:04 2015 -0700
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Sep 23 14:36:04 2015 -0700
----------------------------------------------------------------------
.../org/apache/cassandra/db/ColumnIndex.java | 10 ++-
.../rows/UnfilteredRowIteratorSerializer.java | 2 +
.../cassandra/db/rows/UnfilteredSerializer.java | 69 +++++++++++++++-----
3 files changed, 60 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index add5fa7..ede3f79 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -76,6 +76,7 @@ public class ColumnIndex
private long startPosition = -1;
private int written;
+ private long previousRowStart;
private ClusteringPrefix firstClustering;
private ClusteringPrefix lastClustering;
@@ -99,7 +100,7 @@ public class ColumnIndex
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
- UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, version);
+ UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version);
}
public ColumnIndex build() throws IOException
@@ -131,15 +132,18 @@ public class ColumnIndex
private void add(Unfiltered unfiltered) throws IOException
{
+ long pos = currentPosition();
+
if (firstClustering == null)
{
// Beginning of an index block. Remember the start and position
firstClustering = unfiltered.clustering();
- startPosition = currentPosition();
+ startPosition = pos;
}
- UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
+ UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
lastClustering = unfiltered.clustering();
+ previousRowStart = pos;
++written;
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 3c5cdbf..3a0558e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -90,6 +90,8 @@ public class UnfilteredRowIteratorSerializer
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
+ assert !header.isForSSTable();
+
ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
int flags = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 1f77529..fac8863 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -92,17 +92,31 @@ public class UnfilteredSerializer
public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
throws IOException
{
+ assert !header.isForSSTable();
+ serialize(unfiltered, header, out, 0, version);
+ }
+
+ public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version)
+ throws IOException
+ {
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+ serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize, version);
}
else
{
- serialize((Row) unfiltered, header, out, version);
+ serialize((Row) unfiltered, header, out, previousUnfilteredSize, version);
}
}
- public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version)
+ public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out, int version)
+ throws IOException
+ {
+ assert row.isStatic();
+ serialize(row, header, out, 0, version);
+ }
+
+ private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version)
throws IOException
{
int flags = 0;
@@ -151,7 +165,10 @@ public class UnfilteredSerializer
Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
if (header.isForSSTable())
- out.writeUnsignedVInt(serializedRowBodySize(row, header, version));
+ {
+ out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version));
+ out.writeUnsignedVInt(previousUnfilteredSize);
+ }
if ((flags & HAS_TIMESTAMP) != 0)
header.writeTimestamp(pkLiveness.timestamp(), out);
@@ -186,14 +203,17 @@ public class UnfilteredSerializer
Cell.serializer.serialize(cell, out, rowLiveness, header);
}
- public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
+ private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version)
throws IOException
{
out.writeByte((byte)IS_MARKER);
RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
if (header.isForSSTable())
- out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, version));
+ {
+ out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, previousUnfilteredSize, version));
+ out.writeUnsignedVInt(previousUnfilteredSize);
+ }
if (marker.isBoundary())
{
@@ -214,8 +234,9 @@ public class UnfilteredSerializer
: serializedSize((Row) unfiltered, header, version);
}
- public long serializedSize(Row row, SerializationHeader header, int version)
+ private long serializedSize(Row row, SerializationHeader header, int version)
{
+ assert !header.isForSSTable();
long size = 1; // flags
if (row.isStatic() || row.deletion().isShadowable())
@@ -224,13 +245,16 @@ public class UnfilteredSerializer
if (!row.isStatic())
size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
- return size + serializedRowBodySize(row, header, version);
+ return size + serializedRowBodySize(row, header, 0, version);
}
- public long serializedRowBodySize(Row row, SerializationHeader header, int version)
+ private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize, int version)
{
long size = 0;
+ if (header.isForSSTable())
+ size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
boolean isStatic = row.isStatic();
Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -238,7 +262,6 @@ public class UnfilteredSerializer
boolean hasComplexDeletion = row.hasComplexDeletion();
boolean hasAllColumns = (row.size() == headerColumns.size());
-
if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
if (pkLiveness.isExpiring())
@@ -277,16 +300,20 @@ public class UnfilteredSerializer
return size;
}
- public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
+ private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
{
+ assert !header.isForSSTable();
return 1 // flags
+ RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes())
- + serializedMarkerBodySize(marker, header, version);
+ + serializedMarkerBodySize(marker, header, 0, version);
}
- public long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, int version)
+ private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version)
{
long size = 0;
+ if (header.isForSSTable())
+ size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -349,8 +376,11 @@ public class UnfilteredSerializer
public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound)
throws IOException
{
- if (header.isForSSTable())
- in.readUnsignedVInt(); // Skip marker size
+ if (header.isForSSTable())
+ {
+ in.readUnsignedVInt(); // marker size
+ in.readUnsignedVInt(); // previous unfiltered size
+ }
if (bound.isBoundary())
return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
@@ -368,9 +398,6 @@ public class UnfilteredSerializer
{
try
{
- if (header.isForSSTable())
- in.readUnsignedVInt(); // Skip row size
-
boolean isStatic = isStatic(extendedFlags);
boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
boolean hasTTL = (flags & HAS_TTL) != 0;
@@ -380,6 +407,12 @@ public class UnfilteredSerializer
boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
Columns headerColumns = header.columns(isStatic);
+ if (header.isForSSTable())
+ {
+ in.readUnsignedVInt(); // Skip row size
+ in.readUnsignedVInt(); // previous unfiltered size
+ }
+
LivenessInfo rowLiveness = LivenessInfo.EMPTY;
if (hasTimestamp)
{