You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/07/28 13:48:53 UTC
[2/6] cassandra git commit: Exception when computing read-repair for
range tombstones
Exception when computing read-repair for range tombstones
patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-12263
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f9b9ffe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f9b9ffe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f9b9ffe
Branch: refs/heads/cassandra-3.9
Commit: 3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161
Parents: a11f210
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jul 26 17:26:05 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jul 28 15:43:05 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/rows/UnfilteredRowIterators.java | 45 ++++++-
.../apache/cassandra/service/DataResolver.java | 102 ++++++++++++++--
.../cassandra/service/DataResolverTest.java | 122 ++++++++++++++++++-
4 files changed, 255 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b966078..caecefb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.9
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
* Lost counter writes in compact table and static columns (CASSANDRA-12219)
* AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
* Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 6af3944..3218ff2 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -46,11 +46,52 @@ public abstract class UnfilteredRowIterators
private UnfilteredRowIterators() {}
+ /**
+ * Interface for a listener interested in the result of merging multiple versions of a given row.
+ * <p>
+ * Implementors of this interface are given enough information that they can easily reconstruct the difference
+ * between the merged result and each individual input. This is used when reconciling results on replias for
+ * instance to figure out what to send as read-repair to each source.
+ */
public interface MergeListener
{
+ /**
+ * Called once for the merged partition.
+ *
+ * @param mergedDeletion the partition level deletion for the merged partition. Implementors can test if the
+ * merged partition actually has a partition level deletion or not by calling {@code mergedDeletion.isLive()}.
+ * @param versions the partition level deletion for the sources of the merge. Elements of the array will never
+ * be null, but be "live".
+ **/
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
+ /**
+ * Called once for every row participating in the merge.
+ * <p>
+ * Note that this is called for every clustering where at least one of the source merged has a row. In
+ * particular, this may be called in cases where there is no row in the merged output (if a source has a row
+ * that is shadowed by another source range tombstone or partition level deletion).
+ *
+ * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
+ * placeholder for when at least one source has a row, but that row is shadowed in the merged output.
+ * @param versions for each source, the row in that source corresponding to {@code merged}. This can be
+ * {@code null} for some sources if the source has not such row.
+ */
public void onMergedRows(Row merged, Row[] versions);
+
+ /**
+ * Called once for every range tombstone marker participating in the merge.
+ * <p>
+ * Note that this is called for every "clustering position" where at least one of the source merged has a range
+ * tombstone marker.
+ *
+ * @param merged the marker in the merged output. This can be {@code null} if there is no such marker, which
+ * means that at least one source has a marker in {@code versions} but the merged out has nothing corresponding
+ * (this basically mean the merged output has a currently open deletion that shadows whatever marker the source
+ * had).
+ * @param versions the marker for each source merged. This can be {@code null} for some source if that source
+ * has not such marker.
+ */
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
public void close();
@@ -382,7 +423,7 @@ public abstract class UnfilteredRowIterators
if (!delTime.supersedes(iterDeletion))
delTime = iterDeletion;
}
- if (listener != null && !delTime.isLive())
+ if (listener != null)
listener.onMergedPartitionLevelDeletion(delTime, versions);
return delTime;
}
@@ -499,7 +540,7 @@ public abstract class UnfilteredRowIterators
else
{
RangeTombstoneMarker merged = markerMerger.merge();
- if (merged != null && listener != null)
+ if (listener != null)
listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers());
return merged;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 1fe931f..4e5bfb8 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -166,8 +166,14 @@ public class DataResolver extends ResponseResolver
private final Row.Builder[] currentRows = new Row.Builder[sources.length];
private final RowDiffListener diffListener;
- private final Slice.Bound[] markerOpen = new Slice.Bound[sources.length];
- private final DeletionTime[] markerTime = new DeletionTime[sources.length];
+ // The partition level deletion for the merge row.
+ private DeletionTime partitionLevelDeletion;
+ // When merged has a currently open marker, its time. null otherwise.
+ private DeletionTime mergedDeletionTime;
+ // For each source, the time of the current deletion as known by the source.
+ private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
+ // For each source, record if there is an open range to send as repair, and from where.
+ private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
{
@@ -223,6 +229,7 @@ public class DataResolver extends ResponseResolver
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
{
+ this.partitionLevelDeletion = mergedDeletion;
for (int i = 0; i < versions.length; i++)
{
if (mergedDeletion.supersedes(versions[i]))
@@ -247,24 +254,99 @@ public class DataResolver extends ResponseResolver
Arrays.fill(currentRows, null);
}
+ private DeletionTime currentDeletion()
+ {
+ return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
+ }
+
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
{
+ // The current deletion as of dealing with this marker.
+ DeletionTime currentDeletion = currentDeletion();
+
for (int i = 0; i < versions.length; i++)
{
RangeTombstoneMarker marker = versions[i];
- // Note that boundaries are both close and open, so it's not one or the other
- if (merged.isClose(isReversed) && markerOpen[i] != null)
+
+ // Update what the source now thinks is the current deletion
+ if (marker != null)
+ sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+
+ // If merged == null, some of the source is opening or closing a marker
+ if (merged == null)
{
- Slice.Bound open = markerOpen[i];
- Slice.Bound close = merged.closeBound(isReversed);
- update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]));
+ // but if it's not this source, move to the next one
+ if (marker == null)
+ continue;
+
+ // We have a close and/or open marker for a source, with nothing corresponding in merged.
+ // Because merged is a superset, this imply that we have a current deletion (being it due to an
+ // early opening in merged or a partition level deletion) and that this deletion will still be
+ // active after that point. Further whatever deletion was open or is open by this marker on the
+ // source, that deletion cannot supersedes the current one.
+ //
+ // What we want to know here is if the source deletion and merged deletion was or will be equal,
+ // because in that case we don't want to include any repair for the source, and otherwise we do.
+ //
+ // Note further that if the marker is a boundary, as both side of that boundary will have a
+ // different deletion time, only one side might be equal to the merged deletion. This means we
+ // can only be in one of 2 cases:
+ // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then
+ // it won't be from that point on.
+ // 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
+ // it may now be (if it isn't we just have nothing to do for that marker).
+ assert !currentDeletion.isLive();
+
+ if (markerToRepair[i] == null)
+ {
+ // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+ // this source is that it had a range open with the same deletion as current and it's
+ // closing it. This imply we need to open a deletion for the source from that point.
+ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed));
+ assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed));
+ markerToRepair[i] = marker.closeBound(isReversed).invert();
+ }
+ // In case 2) above, we only have something to do if the source is up-to-date after that point
+ else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+ {
+ closeOpenMarker(i, marker.openBound(isReversed).invert());
+ }
}
- if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
+ else
{
- markerOpen[i] = merged.openBound(isReversed);
- markerTime[i] = merged.openDeletionTime(isReversed);
+ // We have a change of current deletion in merged (potentially to/from no deletion at all).
+
+ if (merged.isClose(isReversed))
+ {
+ // We're closing the merged range. If we've marked the source as needing to be repaired for
+ // that range, close and add it to the repair to be sent.
+ if (markerToRepair[i] != null)
+ closeOpenMarker(i, merged.closeBound(isReversed));
+
+ }
+
+ if (merged.isOpen(isReversed))
+ {
+ // If we're opening a new merged range (or just switching deletion), then unless the source
+ // is up to date on that deletion (note that we've updated what the source deleteion is
+ // above), we'll have to sent the range to the source.
+ DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+ DeletionTime sourceDeletion = sourceDeletionTime[i];
+ if (!newDeletion.equals(sourceDeletion))
+ markerToRepair[i] = merged.openBound(isReversed);
+ }
}
}
+
+ if (merged != null)
+ mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+ }
+
+ private void closeOpenMarker(int i, Slice.Bound close)
+ {
+ Slice.Bound open = markerToRepair[i];
+ update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+ markerToRepair[i] = null;
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f9b9ffe/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 997f4e4..c9878d4 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -53,6 +53,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.db.RangeTombstone.Bound.Kind;
public class DataResolverTest
{
@@ -440,6 +441,105 @@ public class DataResolverTest
assertRepairContainsColumn(msg, "1", "two", "B", 3);
}
+ @Test
+ public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(1, 2);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(2, 1);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(1, 1);
+ }
+
+ /*
+ * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
+ * 1: [1, 2)(3, 4](5, 6] 2
+ * 2: [2, 3][4, 5) 1
+ * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
+ *
+ * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
+ * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
+ * test a few combination.
+ */
+ private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+ InetAddress peer1 = peer();
+ InetAddress peer2 = peer();
+
+ // 1st "stream"
+ RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec);
+ RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
+ RangeTombstone five_six = tombstone("5", false, "6", true , timestamp1, nowInSec);
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
+ .addRangeTombstone(three_four)
+ .addRangeTombstone(five_six)
+ .buildUpdate());
+
+ // 2nd "stream"
+ RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
+ RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
+ .addRangeTombstone(four_five)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ assertEquals(2, messageRecorder.sent.size());
+
+ MessageOut msg1 = getSentMessage(peer1);
+ assertRepairMetadata(msg1);
+ assertRepairContainsNoColumns(msg1);
+
+ MessageOut msg2 = getSentMessage(peer2);
+ assertRepairMetadata(msg2);
+ assertRepairContainsNoColumns(msg2);
+
+ // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
+ // around the value "4" however, as it's included by both stream.
+ // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
+ // from whatever range it receives as repair since the stream already covers it.
+
+ // Message to peer1 contains peer2 ranges
+ assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
+
+ // Message to peer2 contains peer1 ranges
+ assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
+ }
+
+ // Forces the start to be exclusive if the condition holds
+ private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
+ {
+ Slice slice = rt.deletedSlice();
+ return condition
+ ? new RangeTombstone(Slice.make(slice.start().withNewKind(Kind.EXCL_START_BOUND), slice.end()), rt.deletionTime())
+ : rt;
+ }
+
+ // Forces the end to be exclusive if the condition holds
+ private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
+ {
+ Slice slice = rt.deletedSlice();
+ return condition
+ ? new RangeTombstone(Slice.make(slice.start(), slice.end().withNewKind(Kind.EXCL_END_BOUND)), rt.deletionTime())
+ : rt;
+ }
+
private static ByteBuffer bb(int b)
{
return ByteBufferUtil.bytes(b);
@@ -667,7 +767,10 @@ public class DataResolverTest
int i = 0;
while (ranges.hasNext())
{
- assertEquals(ranges.next(), rangeTombstones[i++]);
+ RangeTombstone expected = rangeTombstones[i++];
+ RangeTombstone actual = ranges.next();
+ String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
+ assertEquals(msg, expected, actual);
}
}
@@ -720,8 +823,21 @@ public class DataResolverTest
private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
{
- return new RangeTombstone(Slice.make(cfm.comparator.make(start), cfm.comparator.make(end)),
- new DeletionTime(markedForDeleteAt, localDeletionTime));
+ return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+ }
+
+ private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+ {
+ RangeTombstone.Bound.Kind startKind = inclusiveStart
+ ? Kind.INCL_START_BOUND
+ : Kind.EXCL_START_BOUND;
+ RangeTombstone.Bound.Kind endKind = inclusiveEnd
+ ? Kind.INCL_END_BOUND
+ : Kind.EXCL_END_BOUND;
+
+ RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues());
+ RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues());
+ return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
}
private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec)