You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/08/24 09:39:52 UTC

[3/6] cassandra git commit: Potential AssertionError during ReadRepair of range tombstone and partition deletions

Potential AssertionError during ReadRepair of range tombstone and partition deletions

patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13719


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5e57dd14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5e57dd14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5e57dd14

Branch: refs/heads/trunk
Commit: 5e57dd14eb37adf06d2105227e0105d871ea6f76
Parents: 6a1b1f2
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Jul 21 16:58:53 2017 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 24 11:28:57 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ReadResponse.java   | 22 +++++
 .../db/partitions/AbstractBTreePartition.java   | 14 +--
 .../db/partitions/PartitionUpdate.java          |  9 ++
 .../apache/cassandra/service/DataResolver.java  | 70 +++++++++++++--
 .../cassandra/service/DataResolverTest.java     | 91 +++++++++++++++++++-
 6 files changed, 192 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97dda05..2b49bc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Potential AssertionError during ReadRepair of range tombstone and partition deletions (CASSANDRA-13719)
  * Don't let stress write warmup data if n=0 (CASSANDRA-13773)
  * Gossip thread slows down when using batch commit log (CASSANDRA-12966)
  * Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 693b52b..c59d00a 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -92,6 +92,28 @@ public abstract class ReadResponse
 
     public abstract boolean isDigestResponse();
 
+    /**
+     * Creates a string of the requested partition in this read response suitable for debugging.
+     */
+    public String toDebugString(ReadCommand command, DecoratedKey key)
+    {
+        if (isDigestResponse())
+            return "Digest:0x" + ByteBufferUtil.bytesToHex(digest(command));
+
+        try (UnfilteredPartitionIterator iter = makeIterator(command))
+        {
+            while (iter.hasNext())
+            {
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    if (partition.partitionKey().equals(key))
+                        return ImmutableBTreePartition.create(partition).toString();
+                }
+            }
+        }
+        return "<key " + key + " not found>";
+    }
+
     protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
     {
         MessageDigest digest = FBUtilities.threadLocalMD5Digest();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 2aa622e..1f3dbd0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -99,7 +99,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
 
     public DeletionTime partitionLevelDeletion()
     {
-        return holder().deletionInfo.getPartitionDeletion();
+        return deletionInfo().getPartitionDeletion();
     }
 
     public PartitionColumns columns()
@@ -372,17 +372,21 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
     {
         StringBuilder sb = new StringBuilder();
 
-        sb.append(String.format("[%s.%s] key=%s columns=%s",
+        sb.append(String.format("[%s.%s] key=%s partition_deletion=%s columns=%s",
                                 metadata.ksName,
                                 metadata.cfName,
                                 metadata.getKeyValidator().getString(partitionKey().getKey()),
+                                partitionLevelDeletion(),
                                 columns()));
 
         if (staticRow() != Rows.EMPTY_STATIC_ROW)
-            sb.append("\n    ").append(staticRow().toString(metadata));
+            sb.append("\n    ").append(staticRow().toString(metadata, true));
 
-        for (Row row : this)
-            sb.append("\n    ").append(row.toString(metadata));
+        try (UnfilteredRowIterator iter = unfilteredIterator())
+        {
+            while (iter.hasNext())
+                sb.append("\n    ").append(iter.next().toString(metadata, true));
+        }
 
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 2a881a3..7bd5345 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -299,6 +299,15 @@ public class PartitionUpdate extends AbstractBTreePartition
         return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs));
     }
 
+    // We override this, because the version in the super-class calls holder(), which build the update preventing
+    // further updates, but that's not necessary here and being able to check at least the partition deletion without
+    // "locking" the update is nice (and used in DataResolver.RepairMergeListener.MergeListener).
+    @Override
+    public DeletionInfo deletionInfo()
+    {
+        return deletionInfo;
+    }
+
     /**
      * Modify this update to set every timestamp for live data to {@code newTimestamp} and
      * every deletion timestamp to {@code newTimestamp - 1}.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index c96a893..26b1b2a 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -22,6 +22,8 @@ import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -229,6 +231,17 @@ public class DataResolver extends ResponseResolver
                 return repairs[i];
             }
 
+            /**
+             * The partition level deletion with with which source {@code i} is currently repaired, or
+             * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
+             * up to date on it). The output* of this method is only valid after the call to
+             * {@link #onMergedPartitionLevelDeletion}.
+             */
+            private DeletionTime partitionLevelRepairDeletion(int i)
+            {
+                return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion();
+            }
+
             private Row.Builder currentRow(int i, Clustering clustering)
             {
                 if (currentRows[i] == null)
@@ -273,6 +286,37 @@ public class DataResolver extends ResponseResolver
 
             public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
             {
+                try
+                {
+                    // The code for merging range tombstones is a tad complex and we had the assertions there triggered
+                    // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+                    // when that happen without more context that what the assertion errors give us however, hence the
+                    // catch here that basically gather as much as context as reasonable.
+                    internalOnMergedRangeTombstoneMarkers(merged, versions);
+                }
+                catch (AssertionError e)
+                {
+                    // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+                    // rather get more info to debug than not.
+                    CFMetaData table = command.metadata();
+                    String details = String.format("Error merging RTs on %s.%s: merged=%s, versions=%s, sources={%s}, responses:%n %s",
+                                                   table.ksName, table.cfName,
+                                                   merged == null ? "null" : merged.toString(table),
+                                                   '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+                                                   Arrays.toString(sources),
+                                                   makeResponsesDebugString());
+                    throw new AssertionError(details, e);
+                }
+            }
+
+            private String makeResponsesDebugString()
+            {
+                return Joiner.on(",\n")
+                             .join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
+            }
+
+            private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+            {
                 // The current deletion as of dealing with this marker.
                 DeletionTime currentDeletion = currentDeletion();
 
@@ -297,21 +341,27 @@ public class DataResolver extends ResponseResolver
                         // active after that point. Further whatever deletion was open or is open by this marker on the
                         // source, that deletion cannot supersedes the current one.
                         //
-                        // But while the marker deletion (before and/or after this point) cannot supersed the current
+                        // But while the marker deletion (before and/or after this point) cannot supersede the current
                         // deletion, we want to know if it's equal to it (both before and after), because in that case
                         // the source is up to date and we don't want to include repair.
                         //
                         // So in practice we have 2 possible case:
-                        //  1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then
-                        //     it won't be from that point on unless it's a boundary and the new opened deletion time
-                        //     is also equal to the current deletion (note that this implies the boundary has the same
-                        //     closing and opening deletion time, which should generally not happen, but can due to legacy
-                        //     reading code not avoiding this for a while, see CASSANDRA-13237).
-                        //   2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
-                        //      it may now be (if it isn't we just have nothing to do for that marker).
+                        //  1) the source was up-to-date on deletion up to that point: then it won't be from that point
+                        //     on unless it's a boundary and the new opened deletion time is also equal to the current
+                        //     deletion (note that this implies the boundary has the same closing and opening deletion
+                        //     time, which should generally not happen, but can due to legacy reading code not avoiding
+                        //     this for a while, see CASSANDRA-13237).
+                        //  2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't
+                        //     we just have nothing to do for that marker).
                         assert !currentDeletion.isLive() : currentDeletion.toString();
 
-                        if (markerToRepair[i] == null)
+                        // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair
+                        // nor an "active" partition level deletion (where "active" means that it's greater or equal
+                        // to the current deletion: if the source has a repaired partition deletion lower than the
+                        // current deletion, this means the current deletion is due to a previously open range tombstone,
+                        // and if the source isn't currently repaired for that RT, then it means it's up to date on it).
+                        DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i);
+                        if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
                         {
                             // Since there is an ongoing merged deletion, the only way we don't have an open repair for
                             // this source is that it had a range open with the same deletion as current and it's
@@ -326,6 +376,8 @@ public class DataResolver extends ResponseResolver
                                 markerToRepair[i] = marker.closeBound(isReversed).invert();
                         }
                         // In case 2) above, we only have something to do if the source is up-to-date after that point
+                        // (which, since the source isn't up-to-date before that point, means we're opening a new deletion
+                        // that is equal to the current one).
                         else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
                         {
                             closeOpenMarker(i, marker.openBound(isReversed).invert());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 2f72093..65e18ce 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -574,7 +574,7 @@ public class DataResolverTest
      * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
      * thus still be sent).
      */
-    public void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
+    private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
     {
         DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
         InetAddress peer1 = peer();
@@ -621,6 +621,95 @@ public class DataResolverTest
         assertRepairContainsDeletions(msg, null, expected);
     }
 
+    /**
+     * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
+     * doesn't trigger an assertion error.
+     */
+    @Test
+    public void testRepairRangeTombstoneWithPartitionDeletion()
+    {
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        InetAddress peer1 = peer();
+        InetAddress peer2 = peer();
+
+        // 1st "stream": just a partition deletion
+        UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
+
+        // 2nd "stream": a range tombstone that is covered by the 1st stream
+        RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
+        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+                                                 .addRangeTombstone(rt)
+                                                 .buildUpdate());
+
+        resolver.preprocess(readResponseMessage(peer1, iter1));
+        resolver.preprocess(readResponseMessage(peer2, iter2));
+
+        // No results, we've only reconciled tombstones.
+        try (PartitionIterator data = resolver.resolve())
+        {
+            assertFalse(data.hasNext());
+            // 2nd stream should get repaired
+            assertRepairFuture(resolver, 1);
+        }
+
+        assertEquals(1, messageRecorder.sent.size());
+
+        MessageOut msg = getSentMessage(peer2);
+        assertRepairMetadata(msg);
+        assertRepairContainsNoColumns(msg);
+
+        assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec));
+    }
+
+    /**
+     * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
+     */
+    @Test
+    public void testRepairRangeTombstoneWithPartitionDeletion2()
+    {
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        InetAddress peer1 = peer();
+        InetAddress peer2 = peer();
+
+        // 1st "stream": a partition deletion and a range tombstone
+        RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
+        PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+                                                 .addRangeTombstone(rt1)
+                                                 .buildUpdate();
+        ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
+        UnfilteredPartitionIterator iter1 = iter(upd1);
+
+        // 2nd "stream": a range tombstone that is covered by the other stream rt
+        RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
+        RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
+        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+                                                 .addRangeTombstone(rt2)
+                                                 .addRangeTombstone(rt3)
+                                                 .buildUpdate());
+
+        resolver.preprocess(readResponseMessage(peer1, iter1));
+        resolver.preprocess(readResponseMessage(peer2, iter2));
+
+        // No results, we've only reconciled tombstones.
+        try (PartitionIterator data = resolver.resolve())
+        {
+            assertFalse(data.hasNext());
+            // 2nd stream should get repaired
+            assertRepairFuture(resolver, 1);
+        }
+
+        assertEquals(1, messageRecorder.sent.size());
+
+        MessageOut msg = getSentMessage(peer2);
+        assertRepairMetadata(msg);
+        assertRepairContainsNoColumns(msg);
+
+        // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
+        assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec),
+                                      tombstone("0", true, "2", false, 11, nowInSec),
+                                      tombstone("3", false, "9", true, 11, nowInSec));
+    }
+
     // Forces the start to be exclusive if the condition holds
     private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org