You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/07/06 08:13:38 UTC

[02/16] cassandra git commit: Improve digest calculation in the presence of overlapping tombstones.

Improve digest calculation in the presence of overlapping tombstones.

Patch by Branimir Lambov; reviewed by Sylvain Lebresne for
CASSANDRA-11349

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c1653f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c1653f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c1653f4

Branch: refs/heads/cassandra-3.9
Commit: 3c1653f479faddf000fd818b7c578810c644ae02
Parents: c857919
Author: Branimir Lambov <br...@datastax.com>
Authored: Thu May 5 16:20:52 2016 +0300
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jul 5 11:24:36 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ColumnIndex.java    | 12 +++++++++--
 .../org/apache/cassandra/db/OnDiskAtom.java     |  8 +++++++-
 .../org/apache/cassandra/db/RangeTombstone.java |  6 +++---
 .../db/compaction/LazilyCompactedRow.java       | 21 ++++++++++++++++----
 5 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c1653f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0967ce4..b1dcbe1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.1.16
  * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
  * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
 
 2.1.15
  * Account for partition deletions in tombstone histogram (CASSANDRA-12112)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c1653f4/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index f63dfe1..8f147cc 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -68,12 +68,20 @@ public class ColumnIndex
         private final ByteBuffer key;
         private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
 
-        private final OnDiskAtom.Serializer atomSerializer;
+        private final OnDiskAtom.SerializerForWriting atomSerializer;
 
         public Builder(ColumnFamily cf,
                        ByteBuffer key,
                        DataOutputPlus output)
         {
+            this(cf, key, output, cf.getComparator().onDiskAtomSerializer());
+        }
+
+        public Builder(ColumnFamily cf,
+                ByteBuffer key,
+                DataOutputPlus output,
+                OnDiskAtom.SerializerForWriting serializer)
+        {
             assert cf != null;
             assert key != null;
             assert output != null;
@@ -84,7 +92,7 @@ public class ColumnIndex
             this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
             this.output = output;
             this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
-            this.atomSerializer = cf.getComparator().onDiskAtomSerializer();
+            this.atomSerializer = serializer;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c1653f4/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index b53e43b..3e768ea 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -43,7 +43,13 @@ public interface OnDiskAtom
     public void validateFields(CFMetaData metadata) throws MarshalException;
     public void updateDigest(MessageDigest digest);
 
-    public static class Serializer implements ISSTableSerializer<OnDiskAtom>
+    public interface SerializerForWriting
+    {
+        public void serializeForSSTable(OnDiskAtom atom, DataOutputPlus out) throws IOException;
+        public long serializedSizeForSSTable(OnDiskAtom atom);
+    }
+
+    public static class Serializer implements ISSTableSerializer<OnDiskAtom>, SerializerForWriting
     {
         private final CellNameType type;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c1653f4/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 5e41792..9dc2723 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -152,7 +152,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
          * @return the total serialized size of said tombstones and write them to
          * {@code out} it if isn't null.
          */
-        public long writeOpenedMarkers(Composite startPos, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException
+        public long writeOpenedMarkers(Composite startPos, DataOutputPlus out, OnDiskAtom.SerializerForWriting atomSerializer) throws IOException
         {
             long size = 0;
 
@@ -172,7 +172,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
          *
          * @return the serialized size of written tombstones
          */
-        public long writeUnwrittenTombstones(DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException
+        public long writeUnwrittenTombstones(DataOutputPlus out, OnDiskAtom.SerializerForWriting atomSerializer) throws IOException
         {
             long size = 0;
             for (RangeTombstone rt : unwrittenTombstones)
@@ -183,7 +183,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
             return size;
         }
 
-        private long writeTombstone(RangeTombstone rt, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer)
+        private long writeTombstone(RangeTombstone rt, DataOutputPlus out, OnDiskAtom.SerializerForWriting atomSerializer)
                 throws IOException
         {
             long size = atomSerializer.serializedSizeForSSTable(rt);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c1653f4/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index e9aecb2..f912da2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -148,16 +148,30 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         return RowIndexEntry.create(currentPosition, emptyColumnFamily.deletionInfo().getTopLevelDeletion(), columnsIndex);
     }
 
-    public void update(MessageDigest digest)
+    public void update(final MessageDigest digest)
     {
         assert !closed;
 
         // no special-case for rows.size == 1, we're actually skipping some bytes here so just
         // blindly updating everything wouldn't be correct
         DataOutputBuffer out = new DataOutputBuffer();
+        OnDiskAtom.SerializerForWriting serializer = new OnDiskAtom.SerializerForWriting()
+        {
+            @Override
+            public void serializeForSSTable(OnDiskAtom atom, DataOutputPlus out) throws IOException
+            {
+                atom.updateDigest(digest);
+            }
+
+            @Override
+            public long serializedSizeForSSTable(OnDiskAtom atom)
+            {
+                return 0;
+            }
+        };
 
         // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
-        indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out);
+        indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out, serializer);
 
         try
         {
@@ -171,14 +185,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow
             {
                 digest.update(out.getData(), 0, out.getLength());
             }
+            indexBuilder.buildForCompaction(merger);
         }
         catch (IOException e)
         {
             throw new AssertionError(e);
         }
 
-        while (merger.hasNext())
-            merger.next().updateDigest(digest);
         close();
     }