You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/07/28 13:48:53 UTC

[2/6] cassandra git commit: Exception when computing read-repair for range tombstones

Exception when computing read-repair for range tombstones

patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-12263


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f9b9ffe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f9b9ffe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f9b9ffe

Branch: refs/heads/cassandra-3.9
Commit: 3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161
Parents: a11f210
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jul 26 17:26:05 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jul 28 15:43:05 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/UnfilteredRowIterators.java         |  45 ++++++-
 .../apache/cassandra/service/DataResolver.java  | 102 ++++++++++++++--
 .../cassandra/service/DataResolverTest.java     | 122 ++++++++++++++++++-
 4 files changed, 255 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b966078..caecefb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
  * Lost counter writes in compact table and static columns (CASSANDRA-12219)
  * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
  * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 6af3944..3218ff2 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -46,11 +46,52 @@ public abstract class UnfilteredRowIterators
 
     private UnfilteredRowIterators() {}
 
+    /**
+     * Interface for a listener interested in the result of merging multiple versions of a given row.
+     * <p>
+     * Implementors of this interface are given enough information that they can easily reconstruct the difference
+     * between the merged result and each individual input. This is used when reconciling results on replias for
+     * instance to figure out what to send as read-repair to each source.
+     */
     public interface MergeListener
     {
+        /**
+         * Called once for the merged partition.
+         *
+         * @param mergedDeletion the partition level deletion for the merged partition. Implementors can test if the
+         * merged partition actually has a partition level deletion or not by calling {@code mergedDeletion.isLive()}.
+         * @param versions the partition level deletion for the sources of the merge. Elements of the array will never
+         * be null, but be "live".
+         **/
         public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
 
+        /**
+         * Called once for every row participating in the merge.
+         * <p>
+         * Note that this is called for every clustering where at least one of the source merged has a row. In
+         * particular, this may be called in cases where there is no row in the merged output (if a source has a row
+         * that is shadowed by another source range tombstone or partition level deletion).
+         *
+         * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
+         * placeholder for when at least one source has a row, but that row is shadowed in the merged output.
+         * @param versions for each source, the row in that source corresponding to {@code merged}. This can be
+         * {@code null} for some sources if the source has not such row.
+         */
         public void onMergedRows(Row merged, Row[] versions);
+
+        /**
+         * Called once for every range tombstone marker participating in the merge.
+         * <p>
+         * Note that this is called for every "clustering position" where at least one of the source merged has a range
+         * tombstone marker.
+         *
+         * @param merged the marker in the merged output. This can be {@code null} if there is no such marker, which
+         * means that at least one source has a marker in {@code versions} but the merged out has nothing corresponding
+         * (this basically mean the merged output has a currently open deletion that shadows whatever marker the source
+         * had).
+         * @param versions the marker for each source merged. This can be {@code null} for some source if that source
+         * has not such marker.
+         */
         public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
 
         public void close();
@@ -382,7 +423,7 @@ public abstract class UnfilteredRowIterators
                 if (!delTime.supersedes(iterDeletion))
                     delTime = iterDeletion;
             }
-            if (listener != null && !delTime.isLive())
+            if (listener != null)
                 listener.onMergedPartitionLevelDeletion(delTime, versions);
             return delTime;
         }
@@ -499,7 +540,7 @@ public abstract class UnfilteredRowIterators
                 else
                 {
                     RangeTombstoneMarker merged = markerMerger.merge();
-                    if (merged != null && listener != null)
+                    if (listener != null)
                         listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers());
                     return merged;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 1fe931f..4e5bfb8 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -166,8 +166,14 @@ public class DataResolver extends ResponseResolver
             private final Row.Builder[] currentRows = new Row.Builder[sources.length];
             private final RowDiffListener diffListener;
 
-            private final Slice.Bound[] markerOpen = new Slice.Bound[sources.length];
-            private final DeletionTime[] markerTime = new DeletionTime[sources.length];
+            // The partition level deletion for the merge row.
+            private DeletionTime partitionLevelDeletion;
+            // When merged has a currently open marker, its time. null otherwise.
+            private DeletionTime mergedDeletionTime;
+            // For each source, the time of the current deletion as known by the source.
+            private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
+            // For each source, record if there is an open range to send as repair, and from where.
+            private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
 
             public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
             {
@@ -223,6 +229,7 @@ public class DataResolver extends ResponseResolver
 
             public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
             {
+                this.partitionLevelDeletion = mergedDeletion;
                 for (int i = 0; i < versions.length; i++)
                 {
                     if (mergedDeletion.supersedes(versions[i]))
@@ -247,24 +254,99 @@ public class DataResolver extends ResponseResolver
                 Arrays.fill(currentRows, null);
             }
 
+            private DeletionTime currentDeletion()
+            {
+                return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
+            }
+
             public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
             {
+                // The current deletion as of dealing with this marker.
+                DeletionTime currentDeletion = currentDeletion();
+
                 for (int i = 0; i < versions.length; i++)
                 {
                     RangeTombstoneMarker marker = versions[i];
-                    // Note that boundaries are both close and open, so it's not one or the other
-                    if (merged.isClose(isReversed) && markerOpen[i] != null)
+
+                    // Update what the source now thinks is the current deletion
+                    if (marker != null)
+                        sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+
+                    // If merged == null, some of the source is opening or closing a marker
+                    if (merged == null)
                     {
-                        Slice.Bound open = markerOpen[i];
-                        Slice.Bound close = merged.closeBound(isReversed);
-                        update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]));
+                        // but if it's not this source, move to the next one
+                        if (marker == null)
+                            continue;
+
+                        // We have a close and/or open marker for a source, with nothing corresponding in merged.
+                        // Because merged is a superset, this imply that we have a current deletion (being it due to an
+                        // early opening in merged or a partition level deletion) and that this deletion will still be
+                        // active after that point. Further whatever deletion was open or is open by this marker on the
+                        // source, that deletion cannot supersedes the current one.
+                        //
+                        // What we want to know here is if the source deletion and merged deletion was or will be equal,
+                        // because in that case we don't want to include any repair for the source, and otherwise we do.
+                        //
+                        // Note further that if the marker is a boundary, as both side of that boundary will have a
+                        // different deletion time, only one side might be equal to the merged deletion. This means we
+                        // can only be in one of 2 cases:
+                        //   1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then
+                        //      it won't be from that point on.
+                        //   2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
+                        //      it may now be (if it isn't we just have nothing to do for that marker).
+                        assert !currentDeletion.isLive();
+
+                        if (markerToRepair[i] == null)
+                        {
+                            // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+                            // this source is that it had a range open with the same deletion as current and it's
+                            // closing it. This imply we need to open a deletion for the source from that point.
+                            assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed));
+                            assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed));
+                            markerToRepair[i] = marker.closeBound(isReversed).invert();
+                        }
+                        // In case 2) above, we only have something to do if the source is up-to-date after that point
+                        else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+                        {
+                            closeOpenMarker(i, marker.openBound(isReversed).invert());
+                        }
                     }
-                    if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
+                    else
                     {
-                        markerOpen[i] = merged.openBound(isReversed);
-                        markerTime[i] = merged.openDeletionTime(isReversed);
+                        // We have a change of current deletion in merged (potentially to/from no deletion at all).
+
+                        if (merged.isClose(isReversed))
+                        {
+                            // We're closing the merged range. If we've marked the source as needing to be repaired for
+                            // that range, close and add it to the repair to be sent.
+                            if (markerToRepair[i] != null)
+                                closeOpenMarker(i, merged.closeBound(isReversed));
+
+                        }
+
+                        if (merged.isOpen(isReversed))
+                        {
+                            // If we're opening a new merged range (or just switching deletion), then unless the source
+                            // is up to date on that deletion (note that we've updated what the source deleteion is
+                            // above), we'll have to sent the range to the source.
+                            DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+                            DeletionTime sourceDeletion = sourceDeletionTime[i];
+                            if (!newDeletion.equals(sourceDeletion))
+                                markerToRepair[i] = merged.openBound(isReversed);
+                        }
                     }
                 }
+
+                if (merged != null)
+                    mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+            }
+
+            private void closeOpenMarker(int i, Slice.Bound close)
+            {
+                Slice.Bound open = markerToRepair[i];
+                update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+                markerToRepair[i] = null;
             }
 
             public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 997f4e4..c9878d4 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -53,6 +53,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.db.RangeTombstone.Bound.Kind;
 
 public class DataResolverTest
 {
@@ -440,6 +441,105 @@ public class DataResolverTest
         assertRepairContainsColumn(msg, "1", "two", "B", 3);
     }
 
+    @Test
+    public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
+    {
+        resolveRangeTombstonesOnBoundary(1, 2);
+    }
+
+    @Test
+    public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
+    {
+        resolveRangeTombstonesOnBoundary(2, 1);
+    }
+
+    @Test
+    public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
+    {
+        resolveRangeTombstonesOnBoundary(1, 1);
+    }
+
+    /*
+     * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
+     *   1: [1, 2)(3, 4](5, 6]  2
+     *   2:    [2, 3][4, 5)     1
+     * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
+     *
+     * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
+     * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
+     * test a few combination.
+     */
+    private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
+    {
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        InetAddress peer1 = peer();
+        InetAddress peer2 = peer();
+
+        // 1st "stream"
+        RangeTombstone one_two    = tombstone("1", true , "2", false, timestamp1, nowInSec);
+        RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
+        RangeTombstone five_six   = tombstone("5", false, "6", true , timestamp1, nowInSec);
+        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
+                                                                                            .addRangeTombstone(three_four)
+                                                                                            .addRangeTombstone(five_six)
+                                                                                            .buildUpdate());
+
+        // 2nd "stream"
+        RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
+        RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
+        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
+                                                                                            .addRangeTombstone(four_five)
+                                                                                            .buildUpdate());
+
+        resolver.preprocess(readResponseMessage(peer1, iter1));
+        resolver.preprocess(readResponseMessage(peer2, iter2));
+
+        // No results, we've only reconciled tombstones.
+        try (PartitionIterator data = resolver.resolve())
+        {
+            assertFalse(data.hasNext());
+        }
+
+        assertEquals(2, messageRecorder.sent.size());
+
+        MessageOut msg1 = getSentMessage(peer1);
+        assertRepairMetadata(msg1);
+        assertRepairContainsNoColumns(msg1);
+
+        MessageOut msg2 = getSentMessage(peer2);
+        assertRepairMetadata(msg2);
+        assertRepairContainsNoColumns(msg2);
+
+        // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
+        // around the value "4" however, as it's included by both stream.
+        // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
+        // from whatever range it receives as repair since the stream already covers it.
+
+        // Message to peer1 contains peer2 ranges
+        assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
+
+        // Message to peer2 contains peer1 ranges
+        assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
+    }
+
+    // Forces the start to be exclusive if the condition holds
+    private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
+    {
+        Slice slice = rt.deletedSlice();
+        return condition
+             ? new RangeTombstone(Slice.make(slice.start().withNewKind(Kind.EXCL_START_BOUND), slice.end()), rt.deletionTime())
+             : rt;
+    }
+
+    // Forces the end to be exclusive if the condition holds
+    private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
+    {
+        Slice slice = rt.deletedSlice();
+        return condition
+             ? new RangeTombstone(Slice.make(slice.start(), slice.end().withNewKind(Kind.EXCL_END_BOUND)), rt.deletionTime())
+             : rt;
+    }
+
     private static ByteBuffer bb(int b)
     {
         return ByteBufferUtil.bytes(b);
@@ -667,7 +767,10 @@ public class DataResolverTest
         int i = 0;
         while (ranges.hasNext())
         {
-            assertEquals(ranges.next(), rangeTombstones[i++]);
+            RangeTombstone expected = rangeTombstones[i++];
+            RangeTombstone actual = ranges.next();
+            String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
+            assertEquals(msg, expected, actual);
         }
     }
 
@@ -720,8 +823,21 @@ public class DataResolverTest
 
     private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
     {
-        return new RangeTombstone(Slice.make(cfm.comparator.make(start), cfm.comparator.make(end)),
-                                  new DeletionTime(markedForDeleteAt, localDeletionTime));
+        return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+    }
+
+    private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+    {
+        RangeTombstone.Bound.Kind startKind = inclusiveStart
+                                            ? Kind.INCL_START_BOUND
+                                            : Kind.EXCL_START_BOUND;
+        RangeTombstone.Bound.Kind endKind = inclusiveEnd
+                                          ? Kind.INCL_END_BOUND
+                                          : Kind.EXCL_END_BOUND;
+
+        RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues());
+        RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues());
+        return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
     }
 
     private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec)