You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/07/28 13:48:55 UTC
[4/6] cassandra git commit: Merge commit
'3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161' into cassandra-3.9
Merge commit '3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161' into cassandra-3.9
* commit '3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161':
Exception when computing read-repair for range tombstones
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/561b07e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/561b07e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/561b07e2
Branch: refs/heads/trunk
Commit: 561b07e29c8646176bde5d3eb40b4083fb079ac4
Parents: f0fd9ad 3f9b9ff
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jul 28 15:44:19 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jul 28 15:48:21 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/rows/UnfilteredRowIterators.java | 45 ++++++-
.../apache/cassandra/service/DataResolver.java | 102 +++++++++++++--
.../cassandra/service/DataResolverTest.java | 126 ++++++++++++++++++-
4 files changed, 259 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5740ca7,caecefb..109411a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
-3.0.9
+3.9
+ * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
+ * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
+Merged from 3.0:
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
* Lost counter writes in compact table and static columns (CASSANDRA-12219)
* AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
* Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index bbc0379,4e5bfb8..2c1b347
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -167,8 -166,14 +167,14 @@@ public class DataResolver extends Respo
private final Row.Builder[] currentRows = new Row.Builder[sources.length];
private final RowDiffListener diffListener;
- private final ClusteringBound[] markerOpen = new ClusteringBound[sources.length];
- private final DeletionTime[] markerTime = new DeletionTime[sources.length];
+ // The partition level deletion for the merge row.
+ private DeletionTime partitionLevelDeletion;
+ // When merged has a currently open marker, its time. null otherwise.
+ private DeletionTime mergedDeletionTime;
+ // For each source, the time of the current deletion as known by the source.
+ private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
+ // For each source, record if there is an open range to send as repair, and from where.
- private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
++ private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length];
public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
{
@@@ -265,19 -267,86 +280,86 @@@
for (int i = 0; i < versions.length; i++)
{
RangeTombstoneMarker marker = versions[i];
- // Note that boundaries are both close and open, so it's not one or the other
- if (merged.isClose(isReversed) && markerOpen[i] != null)
+
+ // Update what the source now thinks is the current deletion
+ if (marker != null)
+ sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+
+ // If merged == null, some of the source is opening or closing a marker
+ if (merged == null)
{
- ClusteringBound open = markerOpen[i];
- ClusteringBound close = merged.closeBound(isReversed);
- update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]));
+ // but if it's not this source, move to the next one
+ if (marker == null)
+ continue;
+
+ // We have a close and/or open marker for a source, with nothing corresponding in merged.
+ // Because merged is a superset, this imply that we have a current deletion (being it due to an
+ // early opening in merged or a partition level deletion) and that this deletion will still be
+ // active after that point. Further whatever deletion was open or is open by this marker on the
+ // source, that deletion cannot supersedes the current one.
+ //
+ // What we want to know here is if the source deletion and merged deletion was or will be equal,
+ // because in that case we don't want to include any repair for the source, and otherwise we do.
+ //
+ // Note further that if the marker is a boundary, as both side of that boundary will have a
+ // different deletion time, only one side might be equal to the merged deletion. This means we
+ // can only be in one of 2 cases:
+ // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then
+ // it won't be from that point on.
+ // 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
+ // it may now be (if it isn't we just have nothing to do for that marker).
+ assert !currentDeletion.isLive();
+
+ if (markerToRepair[i] == null)
+ {
+ // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+ // this source is that it had a range open with the same deletion as current and it's
+ // closing it. This imply we need to open a deletion for the source from that point.
+ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed));
+ assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed));
+ markerToRepair[i] = marker.closeBound(isReversed).invert();
+ }
+ // In case 2) above, we only have something to do if the source is up-to-date after that point
+ else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+ {
+ closeOpenMarker(i, marker.openBound(isReversed).invert());
+ }
}
- if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
+ else
{
- markerOpen[i] = merged.openBound(isReversed);
- markerTime[i] = merged.openDeletionTime(isReversed);
+ // We have a change of current deletion in merged (potentially to/from no deletion at all).
+
+ if (merged.isClose(isReversed))
+ {
- // We're closing the merged range. If we've marked the source as needing to be repaired for
- // that range, close and add it to the repair to be sent.
++ // We're closing the merged range. If we're recorded that this should be repaird for the
++ // source, close and add said range to the repair to send.
+ if (markerToRepair[i] != null)
+ closeOpenMarker(i, merged.closeBound(isReversed));
+
+ }
+
+ if (merged.isOpen(isReversed))
+ {
+ // If we're opening a new merged range (or just switching deletion), then unless the source
+ // is up to date on that deletion (note that we've updated what the source deleteion is
+ // above), we'll have to sent the range to the source.
+ DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+ DeletionTime sourceDeletion = sourceDeletionTime[i];
+ if (!newDeletion.equals(sourceDeletion))
+ markerToRepair[i] = merged.openBound(isReversed);
+ }
}
}
+
+ if (merged != null)
+ mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+ }
+
- private void closeOpenMarker(int i, Slice.Bound close)
++ private void closeOpenMarker(int i, ClusteringBound close)
+ {
- Slice.Bound open = markerToRepair[i];
++ ClusteringBound open = markerToRepair[i];
+ update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+ markerToRepair[i] = null;
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/DataResolverTest.java
index b20dfc0,c9878d4..fe7e211
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@@ -53,6 -53,7 +53,7 @@@ import static org.junit.Assert.assertEq
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.RangeTombstone.Bound.Kind;
++import static org.apache.cassandra.db.ClusteringBound.Kind;
public class DataResolverTest
{
@@@ -440,6 -441,105 +441,113 @@@
assertRepairContainsColumn(msg, "1", "two", "B", 3);
}
+ @Test
+ public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(1, 2);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(2, 1);
+ }
+
+ @Test
+ public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
+ {
+ resolveRangeTombstonesOnBoundary(1, 1);
+ }
+
+ /*
+ * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
+ * 1: [1, 2)(3, 4](5, 6] 2
+ * 2: [2, 3][4, 5) 1
+ * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
+ *
+ * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
+ * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
+ * test a few combination.
+ */
+ private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+ InetAddress peer1 = peer();
+ InetAddress peer2 = peer();
+
+ // 1st "stream"
+ RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec);
+ RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
+ RangeTombstone five_six = tombstone("5", false, "6", true , timestamp1, nowInSec);
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
+ .addRangeTombstone(three_four)
+ .addRangeTombstone(five_six)
+ .buildUpdate());
+
+ // 2nd "stream"
+ RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
+ RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
+ .addRangeTombstone(four_five)
+ .buildUpdate());
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ assertEquals(2, messageRecorder.sent.size());
+
+ MessageOut msg1 = getSentMessage(peer1);
+ assertRepairMetadata(msg1);
+ assertRepairContainsNoColumns(msg1);
+
+ MessageOut msg2 = getSentMessage(peer2);
+ assertRepairMetadata(msg2);
+ assertRepairContainsNoColumns(msg2);
+
+ // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
+ // around the value "4" however, as it's included by both stream.
+ // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
+ // from whatever range it receives as repair since the stream already covers it.
+
+ // Message to peer1 contains peer2 ranges
+ assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
+
+ // Message to peer2 contains peer1 ranges
+ assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
+ }
+
+ // Forces the start to be exclusive if the condition holds
+ private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
+ {
++ if (!condition)
++ return rt;
++
+ Slice slice = rt.deletedSlice();
++ ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
+ return condition
- ? new RangeTombstone(Slice.make(slice.start().withNewKind(Kind.EXCL_START_BOUND), slice.end()), rt.deletionTime())
++ ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
+ : rt;
+ }
+
+ // Forces the end to be exclusive if the condition holds
+ private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
+ {
++ if (!condition)
++ return rt;
++
+ Slice slice = rt.deletedSlice();
++ ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
+ return condition
- ? new RangeTombstone(Slice.make(slice.start(), slice.end().withNewKind(Kind.EXCL_END_BOUND)), rt.deletionTime())
++ ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
+ : rt;
+ }
+
private static ByteBuffer bb(int b)
{
return ByteBufferUtil.bytes(b);
@@@ -721,8 -823,21 +832,17 @@@
private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
{
- return new RangeTombstone(Slice.make(cfm.comparator.make(start), cfm.comparator.make(end)),
- new DeletionTime(markedForDeleteAt, localDeletionTime));
+ return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+ }
+
+ private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+ {
- RangeTombstone.Bound.Kind startKind = inclusiveStart
- ? Kind.INCL_START_BOUND
- : Kind.EXCL_START_BOUND;
- RangeTombstone.Bound.Kind endKind = inclusiveEnd
- ? Kind.INCL_END_BOUND
- : Kind.EXCL_END_BOUND;
++ Kind startKind = inclusiveStart ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND;
++ Kind endKind = inclusiveEnd ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND;
+
- RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues());
- RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues());
++ ClusteringBound startBound = ClusteringBound.create(startKind, cfm.comparator.make(start).getRawValues());
++ ClusteringBound endBound = ClusteringBound.create(endKind, cfm.comparator.make(end).getRawValues());
+ return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
}
private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec)