You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/08/24 09:39:51 UTC
[2/6] cassandra git commit: Potential AssertionError during
ReadRepair of range tombstone and partition deletions
Potential AssertionError during ReadRepair of range tombstone and partition deletions
patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13719
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5e57dd14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5e57dd14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5e57dd14
Branch: refs/heads/cassandra-3.11
Commit: 5e57dd14eb37adf06d2105227e0105d871ea6f76
Parents: 6a1b1f2
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Jul 21 16:58:53 2017 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 24 11:28:57 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ReadResponse.java | 22 +++++
.../db/partitions/AbstractBTreePartition.java | 14 +--
.../db/partitions/PartitionUpdate.java | 9 ++
.../apache/cassandra/service/DataResolver.java | 70 +++++++++++++--
.../cassandra/service/DataResolverTest.java | 91 +++++++++++++++++++-
6 files changed, 192 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97dda05..2b49bc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Potential AssertionError during ReadRepair of range tombstone and partition deletions (CASSANDRA-13719)
* Don't let stress write warmup data if n=0 (CASSANDRA-13773)
* Gossip thread slows down when using batch commit log (CASSANDRA-12966)
* Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 693b52b..c59d00a 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -92,6 +92,28 @@ public abstract class ReadResponse
public abstract boolean isDigestResponse();
+ /**
+ * Creates a string of the requested partition in this read response suitable for debugging.
+ */
+ public String toDebugString(ReadCommand command, DecoratedKey key)
+ {
+ if (isDigestResponse())
+ return "Digest:0x" + ByteBufferUtil.bytesToHex(digest(command));
+
+ try (UnfilteredPartitionIterator iter = makeIterator(command))
+ {
+ while (iter.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ if (partition.partitionKey().equals(key))
+ return ImmutableBTreePartition.create(partition).toString();
+ }
+ }
+ }
+ return "<key " + key + " not found>";
+ }
+
protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
{
MessageDigest digest = FBUtilities.threadLocalMD5Digest();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 2aa622e..1f3dbd0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -99,7 +99,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
public DeletionTime partitionLevelDeletion()
{
- return holder().deletionInfo.getPartitionDeletion();
+ return deletionInfo().getPartitionDeletion();
}
public PartitionColumns columns()
@@ -372,17 +372,21 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
{
StringBuilder sb = new StringBuilder();
- sb.append(String.format("[%s.%s] key=%s columns=%s",
+ sb.append(String.format("[%s.%s] key=%s partition_deletion=%s columns=%s",
metadata.ksName,
metadata.cfName,
metadata.getKeyValidator().getString(partitionKey().getKey()),
+ partitionLevelDeletion(),
columns()));
if (staticRow() != Rows.EMPTY_STATIC_ROW)
- sb.append("\n ").append(staticRow().toString(metadata));
+ sb.append("\n ").append(staticRow().toString(metadata, true));
- for (Row row : this)
- sb.append("\n ").append(row.toString(metadata));
+ try (UnfilteredRowIterator iter = unfilteredIterator())
+ {
+ while (iter.hasNext())
+ sb.append("\n ").append(iter.next().toString(metadata, true));
+ }
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 2a881a3..7bd5345 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -299,6 +299,15 @@ public class PartitionUpdate extends AbstractBTreePartition
return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs));
}
+ // We override this, because the version in the super-class calls holder(), which build the update preventing
+ // further updates, but that's not necessary here and being able to check at least the partition deletion without
+ // "locking" the update is nice (and used in DataResolver.RepairMergeListener.MergeListener).
+ @Override
+ public DeletionInfo deletionInfo()
+ {
+ return deletionInfo;
+ }
+
/**
* Modify this update to set every timestamp for live data to {@code newTimestamp} and
* every deletion timestamp to {@code newTimestamp - 1}.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index c96a893..26b1b2a 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -22,6 +22,8 @@ import java.util.*;
import java.util.concurrent.TimeoutException;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -229,6 +231,17 @@ public class DataResolver extends ResponseResolver
return repairs[i];
}
+ /**
+ * The partition level deletion with with which source {@code i} is currently repaired, or
+ * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
+ * up to date on it). The output* of this method is only valid after the call to
+ * {@link #onMergedPartitionLevelDeletion}.
+ */
+ private DeletionTime partitionLevelRepairDeletion(int i)
+ {
+ return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion();
+ }
+
private Row.Builder currentRow(int i, Clustering clustering)
{
if (currentRows[i] == null)
@@ -273,6 +286,37 @@ public class DataResolver extends ResponseResolver
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
{
+ try
+ {
+ // The code for merging range tombstones is a tad complex and we had the assertions there triggered
+ // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+ // when that happen without more context that what the assertion errors give us however, hence the
+ // catch here that basically gather as much as context as reasonable.
+ internalOnMergedRangeTombstoneMarkers(merged, versions);
+ }
+ catch (AssertionError e)
+ {
+ // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+ // rather get more info to debug than not.
+ CFMetaData table = command.metadata();
+ String details = String.format("Error merging RTs on %s.%s: merged=%s, versions=%s, sources={%s}, responses:%n %s",
+ table.ksName, table.cfName,
+ merged == null ? "null" : merged.toString(table),
+ '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ Arrays.toString(sources),
+ makeResponsesDebugString());
+ throw new AssertionError(details, e);
+ }
+ }
+
+ private String makeResponsesDebugString()
+ {
+ return Joiner.on(",\n")
+ .join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
+ }
+
+ private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
// The current deletion as of dealing with this marker.
DeletionTime currentDeletion = currentDeletion();
@@ -297,21 +341,27 @@ public class DataResolver extends ResponseResolver
// active after that point. Further whatever deletion was open or is open by this marker on the
// source, that deletion cannot supersedes the current one.
//
- // But while the marker deletion (before and/or after this point) cannot supersed the current
+ // But while the marker deletion (before and/or after this point) cannot supersede the current
// deletion, we want to know if it's equal to it (both before and after), because in that case
// the source is up to date and we don't want to include repair.
//
// So in practice we have 2 possible case:
- // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then
- // it won't be from that point on unless it's a boundary and the new opened deletion time
- // is also equal to the current deletion (note that this implies the boundary has the same
- // closing and opening deletion time, which should generally not happen, but can due to legacy
- // reading code not avoiding this for a while, see CASSANDRA-13237).
- // 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
- // it may now be (if it isn't we just have nothing to do for that marker).
+ // 1) the source was up-to-date on deletion up to that point: then it won't be from that point
+ // on unless it's a boundary and the new opened deletion time is also equal to the current
+ // deletion (note that this implies the boundary has the same closing and opening deletion
+ // time, which should generally not happen, but can due to legacy reading code not avoiding
+ // this for a while, see CASSANDRA-13237).
+ // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't
+ // we just have nothing to do for that marker).
assert !currentDeletion.isLive() : currentDeletion.toString();
- if (markerToRepair[i] == null)
+ // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair
+ // nor an "active" partition level deletion (where "active" means that it's greater or equal
+ // to the current deletion: if the source has a repaired partition deletion lower than the
+ // current deletion, this means the current deletion is due to a previously open range tombstone,
+ // and if the source isn't currently repaired for that RT, then it means it's up to date on it).
+ DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i);
+ if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
{
// Since there is an ongoing merged deletion, the only way we don't have an open repair for
// this source is that it had a range open with the same deletion as current and it's
@@ -326,6 +376,8 @@ public class DataResolver extends ResponseResolver
markerToRepair[i] = marker.closeBound(isReversed).invert();
}
// In case 2) above, we only have something to do if the source is up-to-date after that point
+ // (which, since the source isn't up-to-date before that point, means we're opening a new deletion
+ // that is equal to the current one).
else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
{
closeOpenMarker(i, marker.openBound(isReversed).invert());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 2f72093..65e18ce 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -574,7 +574,7 @@ public class DataResolverTest
* same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
* thus still be sent).
*/
- public void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
+ private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
{
DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
InetAddress peer1 = peer();
@@ -621,6 +621,95 @@ public class DataResolverTest
assertRepairContainsDeletions(msg, null, expected);
}
+ /**
+ * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
+ * doesn't trigger an assertion error.
+ */
+ @Test
+ public void testRepairRangeTombstoneWithPartitionDeletion()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+ InetAddress peer1 = peer();
+ InetAddress peer2 = peer();
+
+ // 1st "stream": just a partition deletion
+ UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
+
+ // 2nd "stream": a range tombstone that is covered by the 1st stream
+ RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(rt)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ // 2nd stream should get repaired
+ assertRepairFuture(resolver, 1);
+ }
+
+ assertEquals(1, messageRecorder.sent.size());
+
+ MessageOut msg = getSentMessage(peer2);
+ assertRepairMetadata(msg);
+ assertRepairContainsNoColumns(msg);
+
+ assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec));
+ }
+
+ /**
+ * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
+ */
+ @Test
+ public void testRepairRangeTombstoneWithPartitionDeletion2()
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+ InetAddress peer1 = peer();
+ InetAddress peer2 = peer();
+
+ // 1st "stream": a partition deletion and a range tombstone
+ RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
+ PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(rt1)
+ .buildUpdate();
+ ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
+ UnfilteredPartitionIterator iter1 = iter(upd1);
+
+ // 2nd "stream": a range tombstone that is covered by the other stream rt
+ RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
+ RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(rt2)
+ .addRangeTombstone(rt3)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ // 2nd stream should get repaired
+ assertRepairFuture(resolver, 1);
+ }
+
+ assertEquals(1, messageRecorder.sent.size());
+
+ MessageOut msg = getSentMessage(peer2);
+ assertRepairMetadata(msg);
+ assertRepairContainsNoColumns(msg);
+
+ // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
+ assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec),
+ tombstone("0", true, "2", false, 11, nowInSec),
+ tombstone("3", false, "9", true, 11, nowInSec));
+ }
+
// Forces the start to be exclusive if the condition holds
private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org