You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/04/22 14:34:54 UTC

[18/19] git commit: Add range tombstones to read repair digests patch by Oleg Anastasyev; reviewed by jbellis for CASSANDRA-6863

Add range tombstones to read repair digests
patch by Oleg Anastasyev; reviewed by jbellis for CASSANDRA-6863


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2e74354
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2e74354
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2e74354

Branch: refs/heads/trunk
Commit: a2e74354ca51809a11b62dd7995c026807683b0a
Parents: be2686d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:27:20 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:27:20 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ColumnFamily.java   |  5 ++
 .../org/apache/cassandra/db/DeletionInfo.java   | 34 +++++++++-
 .../apache/cassandra/db/RangeTombstoneList.java | 68 ++++++++++++++++++--
 .../apache/cassandra/net/MessagingService.java  | 32 ++++++++-
 test/unit/org/apache/cassandra/Util.java        |  7 ++
 .../apache/cassandra/db/ColumnFamilyTest.java   | 46 ++++++++++++-
 test/unit/org/apache/cassandra/db/RowTest.java  | 40 ++++++++++--
 8 files changed, 222 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae7410e..495dab2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-beta2
+ * Add range tombstones to read repair digests (CASSANDRA-6863)
  * Fix BTree.clear for large updates (CASSANDRA-6943)
  * Fail write instead of logging a warning when unable to append to CL
    (CASSANDRA-6764)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index da404b0..4f85610 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -313,8 +313,11 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
             }
         }
 
+        cfDiff.setDeletionInfo(deletionInfo().diff(cfComposite.deletionInfo()));
+
         if (!cfDiff.isEmpty())
             return cfDiff;
+        
         return null;
     }
 
@@ -385,6 +388,8 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
     {
         for (Cell cell : this)
             cell.updateDigest(digest);
+        if (MessagingService.instance().areAllNodesAtLeast21())
+            deletionInfo().updateDigest(digest);
     }
 
     public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 8601bce..a167b85 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.util.*;
+import java.security.MessageDigest;
+import java.util.Comparator;
+import java.util.Iterator;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
@@ -29,6 +31,7 @@ import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
 /**
@@ -168,6 +171,35 @@ public class DeletionInfo implements IMeasurableMemory
     }
 
     /**
+     * Evaluates difference between this deletion info and superset for read repair
+     *
+     * @return the difference between the two, or LIVE if no difference
+     */
+    public DeletionInfo diff(DeletionInfo superset)
+    {
+        RangeTombstoneList rangeDiff = superset.ranges == null || superset.ranges.isEmpty()
+                                     ? null
+                                     : ranges == null ? superset.ranges : ranges.diff(superset.ranges);
+
+        return topLevel.markedForDeleteAt != superset.topLevel.markedForDeleteAt || rangeDiff != null
+             ? new DeletionInfo(superset.topLevel, rangeDiff)
+             : DeletionInfo.live();
+    }
+
+
+    /**
+     * Digests deletion info. Used to trigger read repair on mismatch.
+     */
+    public void updateDigest(MessageDigest digest)
+    {
+        if (topLevel.markedForDeleteAt != Long.MIN_VALUE)
+            digest.update(ByteBufferUtil.bytes(topLevel.markedForDeleteAt));
+
+        if (ranges != null)
+            ranges.updateDigest(digest);
+    }
+
+    /**
      * Returns true if {@code purge} would remove the top-level tombstone or any of the range
      * tombstones, false otherwise.
      * @param gcBefore timestamp (in seconds) before which tombstones should be purged

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index dd0b9a6..b06c520 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -19,12 +19,16 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.composites.CType;
@@ -32,10 +36,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
-
 import org.apache.cassandra.utils.ObjectSizes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Data structure holding the range tombstones of a ColumnFamily.
@@ -384,6 +385,64 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             }
         };
     }
+    
+    /**
+     * Evaluates a diff between superset (known to be all merged tombstones) and this list for read repair
+     *
+     * @return null if there is no difference
+     */
+    public RangeTombstoneList diff(RangeTombstoneList superset)
+    {
+        if (isEmpty())
+            return superset;
+
+        assert size <= superset.size;
+
+        RangeTombstoneList diff = null;
+
+        int j = 0; // index to iterate through our own list
+        for (int i = 0; i < superset.size; i++)
+        {
+            boolean sameStart = j < size && starts[j].equals(superset.starts[i]);
+            // don't care about local deletion time here. for RR it doesn't makes sense
+            if (!sameStart
+                || !ends[j].equals(superset.ends[i])
+                || markedAts[j] != superset.markedAts[i])
+            {
+                if (diff == null)
+                    diff = new RangeTombstoneList(comparator, Math.min(8, superset.size - i));
+                diff.add(superset.starts[i], superset.ends[i], superset.markedAts[i], superset.delTimes[i]);
+
+                if (sameStart)
+                    j++;
+            }
+            else
+            {
+                j++;
+            }
+        }
+
+        return diff;
+    }
+    
+    /**
+     * Calculates digest for triggering read repair on mismatch
+     */
+    public void updateDigest(MessageDigest digest)
+    {
+        ByteBuffer longBuffer = ByteBuffer.allocate(8);
+        for (int i = 0; i < size; i++)
+        {
+            for (int j = 0; j < starts[i].size(); j++)
+                digest.update(starts[i].get(j).duplicate());
+            for (int j = 0; j < ends[i].size(); j++)
+                digest.update(ends[i].get(j).duplicate());
+
+            longBuffer.putLong(0, markedAts[i]);
+            digest.update(longBuffer.array(), 0, 8);
+        }
+    }
+
 
     @Override
     public boolean equals(Object o)
@@ -393,7 +452,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         RangeTombstoneList that = (RangeTombstoneList)o;
         if (size != that.size)
             return false;
-
+        
         for (int i = 0; i < size; i++)
         {
             if (!starts[i].equals(that.starts[i]))
@@ -779,4 +838,5 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             return false;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 6d9a1b5..4ef57d3 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -84,6 +84,8 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
+    private boolean allNodesAtLeast21;
+
     /* All verb handler identifiers */
     public enum Verb
     {
@@ -760,20 +762,47 @@ public final class MessagingService implements MessagingServiceMBean
         return packed >>> (start + 1) - count & ~(-1 << count);
     }
 
+    public boolean areAllNodesAtLeast21()
+    {
+        return allNodesAtLeast21;
+    }
+
     /**
      * @return the last version associated with address, or @param version if this is the first such version
      */
     public int setVersion(InetAddress endpoint, int version)
     {
         logger.debug("Setting version {} for {}", version, endpoint);
+        if (version < VERSION_21)
+            allNodesAtLeast21 = false;
         Integer v = versions.put(endpoint, version);
+
+        // if the version was increased to 2.0 or later, see if all nodes are >= 2.0 now
+        if (v != null && v < VERSION_21 && version >= VERSION_21)
+            refreshAllNodesAtLeast21();
+
         return v == null ? version : v;
     }
 
     public void resetVersion(InetAddress endpoint)
     {
         logger.debug("Reseting version for {}", endpoint);
-        versions.remove(endpoint);
+        Integer removed = versions.remove(endpoint);
+        if (removed != null && removed <= VERSION_21)
+            refreshAllNodesAtLeast21();
+    }
+
+    private void refreshAllNodesAtLeast21()
+    {
+        for (Integer version: versions.values())
+        {
+            if (version < VERSION_21)
+            {
+                allNodesAtLeast21 = false;
+                return;
+            }
+        }
+        allNodesAtLeast21 = true;
     }
 
     public int getVersion(InetAddress endpoint)
@@ -807,6 +836,7 @@ public final class MessagingService implements MessagingServiceMBean
         return versions.containsKey(endpoint);
     }
 
+
     public void incrementDroppedMessages(Verb verb)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b74f2c9..fe80009 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -368,4 +368,11 @@ public class Util
             throw new RuntimeException(e);
         }
     }
+
+    public static RangeTombstone tombstone(String start, String finish, long timestamp, int localtime)
+    {
+        Composite startName = CellNames.simpleDense(ByteBufferUtil.bytes(start));
+        Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
+        return new RangeTombstone(startName, endName, timestamp , localtime);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
index b791b03..7f8da96 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
@@ -22,12 +22,13 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.TreeMap;
 
 import com.google.common.collect.Iterables;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
@@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.tombstone;
 import static org.junit.Assert.assertEquals;
 
 public class ColumnFamilyTest extends SchemaLoader
@@ -105,6 +107,48 @@ public class ColumnFamilyTest extends SchemaLoader
     }
 
     @Test
+    public void testDigest()
+    {
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+        ColumnFamily cf2 = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+
+        ByteBuffer digest = ColumnFamily.digest(cf);
+
+        cf.addColumn(column("col1", "", 1));
+        cf2.addColumn(column("col1", "", 1));
+
+        assert !digest.equals(ColumnFamily.digest(cf));
+
+        digest = ColumnFamily.digest(cf);
+        assert digest.equals(ColumnFamily.digest(cf2));
+
+        cf.addColumn(column("col2", "", 2));
+        assert !digest.equals(ColumnFamily.digest(cf));
+
+        digest = ColumnFamily.digest(cf);
+        cf.addColumn(column("col1", "", 3));
+        assert !digest.equals(ColumnFamily.digest(cf));
+
+        digest = ColumnFamily.digest(cf);
+        cf.delete(new DeletionTime(4, 4));
+        assert !digest.equals(ColumnFamily.digest(cf));
+
+        digest = ColumnFamily.digest(cf);
+        cf.delete(tombstone("col1", "col11", 5, 5));
+        assert !digest.equals(ColumnFamily.digest(cf));
+
+        digest = ColumnFamily.digest(cf);
+        assert digest.equals(ColumnFamily.digest(cf));
+
+        cf.delete(tombstone("col2", "col21", 5, 5));
+        assert !digest.equals(ColumnFamily.digest(cf));
+
+        digest = ColumnFamily.digest(cf);
+        cf.delete(tombstone("col1", "col11", 5, 5)); // this does not change RangeTombstoneLList
+        assert digest.equals(ColumnFamily.digest(cf));
+    }
+
+    @Test
     public void testTimestamp()
     {
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index f024c44..3de7fdc 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -20,19 +20,19 @@ package org.apache.cassandra.db;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import org.junit.Test;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.tombstone;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-import static org.apache.cassandra.Util.column;
-
 public class RowTest extends SchemaLoader
 {
     @Test
@@ -48,6 +48,38 @@ public class RowTest extends SchemaLoader
         ColumnFamily cfDiff = cf1.diff(cf2);
         assertFalse(cfDiff.hasColumns());
         assertEquals(cfDiff.deletionInfo(), delInfo);
+
+        RangeTombstone tombstone1 = tombstone("1", "11", (long) 123, 123);
+        RangeTombstone tombstone1_2 = tombstone("111", "112", (long) 1230, 123);
+        RangeTombstone tombstone2_1 = tombstone("2", "22", (long) 123, 123);
+        RangeTombstone tombstone2_2 = tombstone("2", "24", (long) 123, 123);
+        RangeTombstone tombstone3_1 = tombstone("3", "31", (long) 123, 123);
+        RangeTombstone tombstone3_2 = tombstone("3", "31", (long) 1230, 123);
+        RangeTombstone tombstone4_1 = tombstone("4", "41", (long) 123, 123);
+        RangeTombstone tombstone4_2 = tombstone("4", "41", (long) 123, 1230);
+        RangeTombstone tombstone5_2 = tombstone("5", "51", (long) 123, 1230);
+        cf1.delete(tombstone1);
+        cf1.delete(tombstone2_1);
+        cf1.delete(tombstone3_1);
+        cf1.delete(tombstone4_1);
+
+        cf2.delete(tombstone1);
+        cf2.delete(tombstone1_2);
+        cf2.delete(tombstone2_2);
+        cf2.delete(tombstone3_2);
+        cf2.delete(tombstone4_2);
+        cf2.delete(tombstone5_2);
+
+        cfDiff = cf1.diff(cf2);
+        assertEquals(0, cfDiff.getColumnCount());
+
+        // only tmbstones which differ in superset or have more recent timestamp to be in diff
+        delInfo.add(tombstone1_2, cf1.getComparator());
+        delInfo.add(tombstone2_2, cf1.getComparator());
+        delInfo.add(tombstone3_2, cf1.getComparator());
+        delInfo.add(tombstone5_2, cf1.getComparator());
+
+        assertEquals(delInfo, cfDiff.deletionInfo());
     }
 
     @Test