You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/04/22 14:34:54 UTC
[18/19] git commit: Add range tombstones to read repair digests patch
by Oleg Anastasyev; reviewed by jbellis for CASSANDRA-6863
Add range tombstones to read repair digests
patch by Oleg Anastasyev; reviewed by jbellis for CASSANDRA-6863
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2e74354
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2e74354
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2e74354
Branch: refs/heads/trunk
Commit: a2e74354ca51809a11b62dd7995c026807683b0a
Parents: be2686d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 22 07:27:20 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 22 07:27:20 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamily.java | 5 ++
.../org/apache/cassandra/db/DeletionInfo.java | 34 +++++++++-
.../apache/cassandra/db/RangeTombstoneList.java | 68 ++++++++++++++++++--
.../apache/cassandra/net/MessagingService.java | 32 ++++++++-
test/unit/org/apache/cassandra/Util.java | 7 ++
.../apache/cassandra/db/ColumnFamilyTest.java | 46 ++++++++++++-
test/unit/org/apache/cassandra/db/RowTest.java | 40 ++++++++++--
8 files changed, 222 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae7410e..495dab2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-beta2
+ * Add range tombstones to read repair digests (CASSANDRA-6863)
* Fix BTree.clear for large updates (CASSANDRA-6943)
* Fail write instead of logging a warning when unable to append to CL
(CASSANDRA-6764)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index da404b0..4f85610 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -313,8 +313,11 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
}
}
+ cfDiff.setDeletionInfo(deletionInfo().diff(cfComposite.deletionInfo()));
+
if (!cfDiff.isEmpty())
return cfDiff;
+
return null;
}
@@ -385,6 +388,8 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
{
for (Cell cell : this)
cell.updateDigest(digest);
+ if (MessagingService.instance().areAllNodesAtLeast21())
+ deletionInfo().updateDigest(digest);
}
public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 8601bce..a167b85 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
-import java.util.*;
+import java.security.MessageDigest;
+import java.util.Comparator;
+import java.util.Iterator;
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
@@ -29,6 +31,7 @@ import org.apache.cassandra.db.composites.CType;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
/**
@@ -168,6 +171,35 @@ public class DeletionInfo implements IMeasurableMemory
}
/**
+ * Evaluates difference between this deletion info and superset for read repair
+ *
+ * @return the difference between the two, or LIVE if no difference
+ */
+ public DeletionInfo diff(DeletionInfo superset)
+ {
+ RangeTombstoneList rangeDiff = superset.ranges == null || superset.ranges.isEmpty()
+ ? null
+ : ranges == null ? superset.ranges : ranges.diff(superset.ranges);
+
+ return topLevel.markedForDeleteAt != superset.topLevel.markedForDeleteAt || rangeDiff != null
+ ? new DeletionInfo(superset.topLevel, rangeDiff)
+ : DeletionInfo.live();
+ }
+
+
+ /**
+ * Digests deletion info. Used to trigger read repair on mismatch.
+ */
+ public void updateDigest(MessageDigest digest)
+ {
+ if (topLevel.markedForDeleteAt != Long.MIN_VALUE)
+ digest.update(ByteBufferUtil.bytes(topLevel.markedForDeleteAt));
+
+ if (ranges != null)
+ ranges.updateDigest(digest);
+ }
+
+ /**
* Returns true if {@code purge} would remove the top-level tombstone or any of the range
* tombstones, false otherwise.
* @param gcBefore timestamp (in seconds) before which tombstones should be purged
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index dd0b9a6..b06c520 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -19,12 +19,16 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.composites.CType;
@@ -32,10 +36,7 @@ import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
-
import org.apache.cassandra.utils.ObjectSizes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Data structure holding the range tombstones of a ColumnFamily.
@@ -384,6 +385,64 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
}
};
}
+
+ /**
+ * Evaluates a diff between superset (known to be all merged tombstones) and this list for read repair
+ *
+ * @return null if there is no difference
+ */
+ public RangeTombstoneList diff(RangeTombstoneList superset)
+ {
+ if (isEmpty())
+ return superset;
+
+ assert size <= superset.size;
+
+ RangeTombstoneList diff = null;
+
+ int j = 0; // index to iterate through our own list
+ for (int i = 0; i < superset.size; i++)
+ {
+ boolean sameStart = j < size && starts[j].equals(superset.starts[i]);
+ // don't care about local deletion time here. for RR it doesn't makes sense
+ if (!sameStart
+ || !ends[j].equals(superset.ends[i])
+ || markedAts[j] != superset.markedAts[i])
+ {
+ if (diff == null)
+ diff = new RangeTombstoneList(comparator, Math.min(8, superset.size - i));
+ diff.add(superset.starts[i], superset.ends[i], superset.markedAts[i], superset.delTimes[i]);
+
+ if (sameStart)
+ j++;
+ }
+ else
+ {
+ j++;
+ }
+ }
+
+ return diff;
+ }
+
+ /**
+ * Calculates digest for triggering read repair on mismatch
+ */
+ public void updateDigest(MessageDigest digest)
+ {
+ ByteBuffer longBuffer = ByteBuffer.allocate(8);
+ for (int i = 0; i < size; i++)
+ {
+ for (int j = 0; j < starts[i].size(); j++)
+ digest.update(starts[i].get(j).duplicate());
+ for (int j = 0; j < ends[i].size(); j++)
+ digest.update(ends[i].get(j).duplicate());
+
+ longBuffer.putLong(0, markedAts[i]);
+ digest.update(longBuffer.array(), 0, 8);
+ }
+ }
+
@Override
public boolean equals(Object o)
@@ -393,7 +452,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
RangeTombstoneList that = (RangeTombstoneList)o;
if (size != that.size)
return false;
-
+
for (int i = 0; i < size; i++)
{
if (!starts[i].equals(that.starts[i]))
@@ -779,4 +838,5 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
return false;
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 6d9a1b5..4ef57d3 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -84,6 +84,8 @@ public final class MessagingService implements MessagingServiceMBean
*/
public static final int PROTOCOL_MAGIC = 0xCA552DFA;
+ private boolean allNodesAtLeast21;
+
/* All verb handler identifiers */
public enum Verb
{
@@ -760,20 +762,47 @@ public final class MessagingService implements MessagingServiceMBean
return packed >>> (start + 1) - count & ~(-1 << count);
}
+ public boolean areAllNodesAtLeast21()
+ {
+ return allNodesAtLeast21;
+ }
+
/**
* @return the last version associated with address, or @param version if this is the first such version
*/
public int setVersion(InetAddress endpoint, int version)
{
logger.debug("Setting version {} for {}", version, endpoint);
+ if (version < VERSION_21)
+ allNodesAtLeast21 = false;
Integer v = versions.put(endpoint, version);
+
+ // if the version was increased to 2.0 or later, see if all nodes are >= 2.0 now
+ if (v != null && v < VERSION_21 && version >= VERSION_21)
+ refreshAllNodesAtLeast21();
+
return v == null ? version : v;
}
public void resetVersion(InetAddress endpoint)
{
logger.debug("Reseting version for {}", endpoint);
- versions.remove(endpoint);
+ Integer removed = versions.remove(endpoint);
+ if (removed != null && removed <= VERSION_21)
+ refreshAllNodesAtLeast21();
+ }
+
+ private void refreshAllNodesAtLeast21()
+ {
+ for (Integer version: versions.values())
+ {
+ if (version < VERSION_21)
+ {
+ allNodesAtLeast21 = false;
+ return;
+ }
+ }
+ allNodesAtLeast21 = true;
}
public int getVersion(InetAddress endpoint)
@@ -807,6 +836,7 @@ public final class MessagingService implements MessagingServiceMBean
return versions.containsKey(endpoint);
}
+
public void incrementDroppedMessages(Verb verb)
{
assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b74f2c9..fe80009 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -368,4 +368,11 @@ public class Util
throw new RuntimeException(e);
}
}
+
+ public static RangeTombstone tombstone(String start, String finish, long timestamp, int localtime)
+ {
+ Composite startName = CellNames.simpleDense(ByteBufferUtil.bytes(start));
+ Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
+ return new RangeTombstone(startName, endName, timestamp , localtime);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
index b791b03..7f8da96 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
@@ -22,12 +22,13 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.TreeMap;
import com.google.common.collect.Iterables;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
@@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.column;
import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.tombstone;
import static org.junit.Assert.assertEquals;
public class ColumnFamilyTest extends SchemaLoader
@@ -105,6 +107,48 @@ public class ColumnFamilyTest extends SchemaLoader
}
@Test
+ public void testDigest()
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+ ColumnFamily cf2 = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+
+ ByteBuffer digest = ColumnFamily.digest(cf);
+
+ cf.addColumn(column("col1", "", 1));
+ cf2.addColumn(column("col1", "", 1));
+
+ assert !digest.equals(ColumnFamily.digest(cf));
+
+ digest = ColumnFamily.digest(cf);
+ assert digest.equals(ColumnFamily.digest(cf2));
+
+ cf.addColumn(column("col2", "", 2));
+ assert !digest.equals(ColumnFamily.digest(cf));
+
+ digest = ColumnFamily.digest(cf);
+ cf.addColumn(column("col1", "", 3));
+ assert !digest.equals(ColumnFamily.digest(cf));
+
+ digest = ColumnFamily.digest(cf);
+ cf.delete(new DeletionTime(4, 4));
+ assert !digest.equals(ColumnFamily.digest(cf));
+
+ digest = ColumnFamily.digest(cf);
+ cf.delete(tombstone("col1", "col11", 5, 5));
+ assert !digest.equals(ColumnFamily.digest(cf));
+
+ digest = ColumnFamily.digest(cf);
+ assert digest.equals(ColumnFamily.digest(cf));
+
+ cf.delete(tombstone("col2", "col21", 5, 5));
+ assert !digest.equals(ColumnFamily.digest(cf));
+
+ digest = ColumnFamily.digest(cf);
+ cf.delete(tombstone("col1", "col11", 5, 5)); // this does not change RangeTombstoneLList
+ assert digest.equals(ColumnFamily.digest(cf));
+ }
+
+ @Test
public void testTimestamp()
{
ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index f024c44..3de7fdc 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -20,19 +20,19 @@ package org.apache.cassandra.db;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import org.junit.Test;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.tombstone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.apache.cassandra.Util.column;
-
public class RowTest extends SchemaLoader
{
@Test
@@ -48,6 +48,38 @@ public class RowTest extends SchemaLoader
ColumnFamily cfDiff = cf1.diff(cf2);
assertFalse(cfDiff.hasColumns());
assertEquals(cfDiff.deletionInfo(), delInfo);
+
+ RangeTombstone tombstone1 = tombstone("1", "11", (long) 123, 123);
+ RangeTombstone tombstone1_2 = tombstone("111", "112", (long) 1230, 123);
+ RangeTombstone tombstone2_1 = tombstone("2", "22", (long) 123, 123);
+ RangeTombstone tombstone2_2 = tombstone("2", "24", (long) 123, 123);
+ RangeTombstone tombstone3_1 = tombstone("3", "31", (long) 123, 123);
+ RangeTombstone tombstone3_2 = tombstone("3", "31", (long) 1230, 123);
+ RangeTombstone tombstone4_1 = tombstone("4", "41", (long) 123, 123);
+ RangeTombstone tombstone4_2 = tombstone("4", "41", (long) 123, 1230);
+ RangeTombstone tombstone5_2 = tombstone("5", "51", (long) 123, 1230);
+ cf1.delete(tombstone1);
+ cf1.delete(tombstone2_1);
+ cf1.delete(tombstone3_1);
+ cf1.delete(tombstone4_1);
+
+ cf2.delete(tombstone1);
+ cf2.delete(tombstone1_2);
+ cf2.delete(tombstone2_2);
+ cf2.delete(tombstone3_2);
+ cf2.delete(tombstone4_2);
+ cf2.delete(tombstone5_2);
+
+ cfDiff = cf1.diff(cf2);
+ assertEquals(0, cfDiff.getColumnCount());
+
+ // only tmbstones which differ in superset or have more recent timestamp to be in diff
+ delInfo.add(tombstone1_2, cf1.getComparator());
+ delInfo.add(tombstone2_2, cf1.getComparator());
+ delInfo.add(tombstone3_2, cf1.getComparator());
+ delInfo.add(tombstone5_2, cf1.getComparator());
+
+ assertEquals(delInfo, cfDiff.deletionInfo());
}
@Test