You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/07/28 13:48:56 UTC

[5/6] cassandra git commit: Merge commit '3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161' into cassandra-3.9

Merge commit '3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161' into cassandra-3.9

* commit '3f9b9ffe6b83987f7a1feb9a1d269e0f7c112161':
  Exception when computing read-repair for range tombstones


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/561b07e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/561b07e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/561b07e2

Branch: refs/heads/cassandra-3.9
Commit: 561b07e29c8646176bde5d3eb40b4083fb079ac4
Parents: f0fd9ad 3f9b9ff
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jul 28 15:44:19 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jul 28 15:48:21 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/UnfilteredRowIterators.java         |  45 ++++++-
 .../apache/cassandra/service/DataResolver.java  | 102 +++++++++++++--
 .../cassandra/service/DataResolverTest.java     | 126 ++++++++++++++++++-
 4 files changed, 259 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5740ca7,caecefb..109411a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.9
 +3.9
 + * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
 + * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
 +Merged from 3.0:
+  * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
   * Lost counter writes in compact table and static columns (CASSANDRA-12219)
   * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
   * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index bbc0379,4e5bfb8..2c1b347
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -167,8 -166,14 +167,14 @@@ public class DataResolver extends Respo
              private final Row.Builder[] currentRows = new Row.Builder[sources.length];
              private final RowDiffListener diffListener;
  
-             private final ClusteringBound[] markerOpen = new ClusteringBound[sources.length];
-             private final DeletionTime[] markerTime = new DeletionTime[sources.length];
+             // The partition level deletion for the merge row.
+             private DeletionTime partitionLevelDeletion;
+             // When merged has a currently open marker, its time. null otherwise.
+             private DeletionTime mergedDeletionTime;
+             // For each source, the time of the current deletion as known by the source.
+             private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
+             // For each source, record if there is an open range to send as repair, and from where.
 -            private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
++            private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length];
  
              public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
              {
@@@ -265,19 -267,86 +280,86 @@@
                  for (int i = 0; i < versions.length; i++)
                  {
                      RangeTombstoneMarker marker = versions[i];
-                     // Note that boundaries are both close and open, so it's not one or the other
-                     if (merged.isClose(isReversed) && markerOpen[i] != null)
+ 
+                     // Update what the source now thinks is the current deletion
+                     if (marker != null)
+                         sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+ 
+                     // If merged == null, some of the source is opening or closing a marker
+                     if (merged == null)
                      {
-                         ClusteringBound open = markerOpen[i];
-                         ClusteringBound close = merged.closeBound(isReversed);
-                         update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]));
+                         // but if it's not this source, move to the next one
+                         if (marker == null)
+                             continue;
+ 
+                         // We have a close and/or open marker for a source, with nothing corresponding in merged.
+                         // Because merged is a superset, this imply that we have a current deletion (being it due to an
+                         // early opening in merged or a partition level deletion) and that this deletion will still be
+                         // active after that point. Further whatever deletion was open or is open by this marker on the
+                         // source, that deletion cannot supersedes the current one.
+                         //
+                         // What we want to know here is if the source deletion and merged deletion was or will be equal,
+                         // because in that case we don't want to include any repair for the source, and otherwise we do.
+                         //
+                         // Note further that if the marker is a boundary, as both side of that boundary will have a
+                         // different deletion time, only one side might be equal to the merged deletion. This means we
+                         // can only be in one of 2 cases:
+                         //   1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then
+                         //      it won't be from that point on.
+                         //   2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
+                         //      it may now be (if it isn't we just have nothing to do for that marker).
+                         assert !currentDeletion.isLive();
+ 
+                         if (markerToRepair[i] == null)
+                         {
+                             // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+                             // this source is that it had a range open with the same deletion as current and it's
+                             // closing it. This imply we need to open a deletion for the source from that point.
+                             assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed));
+                             assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed));
+                             markerToRepair[i] = marker.closeBound(isReversed).invert();
+                         }
+                         // In case 2) above, we only have something to do if the source is up-to-date after that point
+                         else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+                         {
+                             closeOpenMarker(i, marker.openBound(isReversed).invert());
+                         }
                      }
-                     if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
+                     else
                      {
-                         markerOpen[i] = merged.openBound(isReversed);
-                         markerTime[i] = merged.openDeletionTime(isReversed);
+                         // We have a change of current deletion in merged (potentially to/from no deletion at all).
+ 
+                         if (merged.isClose(isReversed))
+                         {
 -                            // We're closing the merged range. If we've marked the source as needing to be repaired for
 -                            // that range, close and add it to the repair to be sent.
++                            // We're closing the merged range. If we're recorded that this should be repaird for the
++                            // source, close and add said range to the repair to send.
+                             if (markerToRepair[i] != null)
+                                 closeOpenMarker(i, merged.closeBound(isReversed));
+ 
+                         }
+ 
+                         if (merged.isOpen(isReversed))
+                         {
+                             // If we're opening a new merged range (or just switching deletion), then unless the source
+                             // is up to date on that deletion (note that we've updated what the source deleteion is
+                             // above), we'll have to sent the range to the source.
+                             DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+                             DeletionTime sourceDeletion = sourceDeletionTime[i];
+                             if (!newDeletion.equals(sourceDeletion))
+                                 markerToRepair[i] = merged.openBound(isReversed);
+                         }
                      }
                  }
+ 
+                 if (merged != null)
+                     mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+             }
+ 
 -            private void closeOpenMarker(int i, Slice.Bound close)
++            private void closeOpenMarker(int i, ClusteringBound close)
+             {
 -                Slice.Bound open = markerToRepair[i];
++                ClusteringBound open = markerToRepair[i];
+                 update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+                 markerToRepair[i] = null;
              }
  
              public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561b07e2/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/DataResolverTest.java
index b20dfc0,c9878d4..fe7e211
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@@ -53,6 -53,7 +53,7 @@@ import static org.junit.Assert.assertEq
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertTrue;
 -import static org.apache.cassandra.db.RangeTombstone.Bound.Kind;
++import static org.apache.cassandra.db.ClusteringBound.Kind;
  
  public class DataResolverTest
  {
@@@ -440,6 -441,105 +441,113 @@@
          assertRepairContainsColumn(msg, "1", "two", "B", 3);
      }
  
+     @Test
+     public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
+     {
+         resolveRangeTombstonesOnBoundary(1, 2);
+     }
+ 
+     @Test
+     public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
+     {
+         resolveRangeTombstonesOnBoundary(2, 1);
+     }
+ 
+     @Test
+     public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
+     {
+         resolveRangeTombstonesOnBoundary(1, 1);
+     }
+ 
+     /*
+      * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
+      *   1: [1, 2)(3, 4](5, 6]  2
+      *   2:    [2, 3][4, 5)     1
+      * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
+      *
+      * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
+      * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
+      * test a few combination.
+      */
+     private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
+     {
+         DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+         InetAddress peer1 = peer();
+         InetAddress peer2 = peer();
+ 
+         // 1st "stream"
+         RangeTombstone one_two    = tombstone("1", true , "2", false, timestamp1, nowInSec);
+         RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
+         RangeTombstone five_six   = tombstone("5", false, "6", true , timestamp1, nowInSec);
+         UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
+                                                                                             .addRangeTombstone(three_four)
+                                                                                             .addRangeTombstone(five_six)
+                                                                                             .buildUpdate());
+ 
+         // 2nd "stream"
+         RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
+         RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
+         UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
+                                                                                             .addRangeTombstone(four_five)
+                                                                                             .buildUpdate());
+ 
+         resolver.preprocess(readResponseMessage(peer1, iter1));
+         resolver.preprocess(readResponseMessage(peer2, iter2));
+ 
+         // No results, we've only reconciled tombstones.
+         try (PartitionIterator data = resolver.resolve())
+         {
+             assertFalse(data.hasNext());
+         }
+ 
+         assertEquals(2, messageRecorder.sent.size());
+ 
+         MessageOut msg1 = getSentMessage(peer1);
+         assertRepairMetadata(msg1);
+         assertRepairContainsNoColumns(msg1);
+ 
+         MessageOut msg2 = getSentMessage(peer2);
+         assertRepairMetadata(msg2);
+         assertRepairContainsNoColumns(msg2);
+ 
+         // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
+         // around the value "4" however, as it's included by both stream.
+         // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
+         // from whatever range it receives as repair since the stream already covers it.
+ 
+         // Message to peer1 contains peer2 ranges
+         assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
+ 
+         // Message to peer2 contains peer1 ranges
+         assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
+     }
+ 
+     // Forces the start to be exclusive if the condition holds
+     private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
+     {
++        if (!condition)
++            return rt;
++
+         Slice slice = rt.deletedSlice();
++        ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
+         return condition
 -             ? new RangeTombstone(Slice.make(slice.start().withNewKind(Kind.EXCL_START_BOUND), slice.end()), rt.deletionTime())
++             ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
+              : rt;
+     }
+ 
+     // Forces the end to be exclusive if the condition holds
+     private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
+     {
++        if (!condition)
++            return rt;
++
+         Slice slice = rt.deletedSlice();
++        ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
+         return condition
 -             ? new RangeTombstone(Slice.make(slice.start(), slice.end().withNewKind(Kind.EXCL_END_BOUND)), rt.deletionTime())
++             ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
+              : rt;
+     }
+ 
      private static ByteBuffer bb(int b)
      {
          return ByteBufferUtil.bytes(b);
@@@ -721,8 -823,21 +832,17 @@@
  
      private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
      {
-         return new RangeTombstone(Slice.make(cfm.comparator.make(start), cfm.comparator.make(end)),
-                                   new DeletionTime(markedForDeleteAt, localDeletionTime));
+         return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+     }
+ 
+     private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+     {
 -        RangeTombstone.Bound.Kind startKind = inclusiveStart
 -                                            ? Kind.INCL_START_BOUND
 -                                            : Kind.EXCL_START_BOUND;
 -        RangeTombstone.Bound.Kind endKind = inclusiveEnd
 -                                          ? Kind.INCL_END_BOUND
 -                                          : Kind.EXCL_END_BOUND;
++        Kind startKind = inclusiveStart ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND;
++        Kind endKind = inclusiveEnd ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND;
+ 
 -        RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues());
 -        RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues());
++        ClusteringBound startBound = ClusteringBound.create(startKind, cfm.comparator.make(start).getRawValues());
++        ClusteringBound endBound = ClusteringBound.create(endKind, cfm.comparator.make(end).getRawValues());
+         return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
      }
  
      private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec)