You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/26 10:14:51 UTC

cassandra git commit: Introduce RangesAtEndpoint.unwrap; simplify StreamSession.addTransferRanges

Repository: cassandra
Updated Branches:
  refs/heads/trunk 8554d6b35 -> 914c66685


Introduce RangesAtEndpoint.unwrap; simplify StreamSession.addTransferRanges


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/914c6668
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/914c6668
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/914c6668

Branch: refs/heads/trunk
Commit: 914c66685c5bebe1624d827a9b4562b73a08c297
Parents: 8554d6b
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Tue Sep 18 13:17:15 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Wed Sep 26 11:12:12 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/locator/RangesAtEndpoint.java     | 31 +++++++++++++
 .../cassandra/streaming/StreamSession.java      | 11 +----
 .../locator/ReplicaCollectionTest.java          | 46 +++++++++++++++-----
 4 files changed, 69 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9139822..e227c40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770)
  * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735)
  * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
  * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index f57c28e..8319d92 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -165,6 +165,37 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
         return Collections.unmodifiableMap(byRange);
     }
 
+    /**
+     * @return if there are no wrap around ranges contained in this RangesAtEndpoint, return self;
+     * otherwise, return a RangesAtEndpoint covering the same logical portions of the ring, but with those ranges unwrapped
+     */
+    public RangesAtEndpoint unwrap()
+    {
+        int wrapAroundCount = 0;
+        for (Replica replica : this)
+        {
+            if (replica.range().isWrapAround())
+                ++wrapAroundCount;
+        }
+
+        assert wrapAroundCount <= 1;
+        if (wrapAroundCount == 0)
+            return snapshot();
+
+        RangesAtEndpoint.Builder builder = builder(endpoint, size() + wrapAroundCount);
+        for (Replica replica : this)
+        {
+            if (!replica.range().isWrapAround())
+            {
+                builder.add(replica);
+                continue;
+            }
+            for (Range<Token> range : replica.range().unwrap())
+                builder.add(replica.decorateSubrange(range));
+        }
+        return builder.build();
+    }
+
     public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint)
     {
         return collector(ImmutableSet.of(), () -> new Builder(endpoint));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index d7d0836..80fcebb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -335,15 +335,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         //Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have?
         //Do we need to unwrap here also or is that just making it worse?
         //Range and if it's transient
-        RangesAtEndpoint.Builder unwrappedRanges = RangesAtEndpoint.builder(replicas.endpoint(), replicas.size());
-        for (Replica replica : replicas)
-        {
-            for (Range<Token> unwrapped : replica.range().unwrap())
-            {
-                unwrappedRanges.add(new Replica(replica.endpoint(), unwrapped, replica.isFull()));
-            }
-        }
-        List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges.build(), stores, pendingRepair, previewKind);
+        RangesAtEndpoint unwrappedRanges = replicas.unwrap();
+        List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges, stores, pendingRepair, previewKind);
         addTransferStreams(streams);
         Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace);
         if (toBeUpdated == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index f937f96..c289d50 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -65,7 +66,7 @@ public class ReplicaCollectionTest
             R2 = range(1, 2);
             R3 = range(2, 3);
             R4 = range(3, 4);
-            R5 = range(4, 5);
+            R5 = range(4, 0);
             BROADCAST_RANGE = range(10, 11);
             NULL_RANGE = range(10000, 10001);
         }
@@ -187,7 +188,7 @@ public class ReplicaCollectionTest
             // remove start
             // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
             {
-                Predicate<Replica> removeFirst = r -> r != canonicalList.get(0);
+                Predicate<Replica> removeFirst = r -> !r.equals(canonicalList.get(0));
                 assertSubList(test.filter(removeFirst), 1, canonicalList.size());
                 assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2));
             }
@@ -199,14 +200,14 @@ public class ReplicaCollectionTest
             // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
             {
                 int last = canonicalList.size() - 1;
-                Predicate<Replica> removeLast = r -> r != canonicalList.get(last);
+                Predicate<Replica> removeLast = r -> !r.equals(canonicalList.get(last));
                 assertSubList(test.filter(removeLast), 0, last);
             }
 
             if (test.size() <= 2)
                 return;
 
-            Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
+            Predicate<Replica> removeMiddle = r -> !r.equals(canonicalList.get(canonicalList.size() / 2));
             TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test)));
             filtered.testAll(subListDepth, filterDepth - 1, sortDepth);
         }
@@ -224,7 +225,7 @@ public class ReplicaCollectionTest
             for (int i = 0 ; i < canonicalList.size() ; ++i)
             {
                 Replica discount = canonicalList.get(i);
-                Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount));
+                Assert.assertEquals(canonicalList.size() - 1, test.count(r -> !r.equals(discount)));
             }
         }
 
@@ -245,15 +246,15 @@ public class ReplicaCollectionTest
         {
             final Comparator<Replica> comparator = (o1, o2) ->
             {
-                boolean f1 = o1 == canonicalList.get(0);
-                boolean f2 = o2 == canonicalList.get(0);
+                boolean f1 = o1.equals(canonicalList.get(0));
+                boolean f2 = o2.equals(canonicalList.get(0));
                 return f1 == f2 ? 0 : f1 ? 1 : -1;
             };
             TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
             sorted.testAll(subListDepth, filterDepth, sortDepth - 1);
         }
 
-        private void testAll(int subListDepth, int filterDepth, int sortDepth)
+        void testAll(int subListDepth, int filterDepth, int sortDepth)
         {
             testEndpoints();
             testOrderOfIteration();
@@ -312,12 +313,35 @@ public class ReplicaCollectionTest
             Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges());
         }
 
-        @Override
-        public void testAll()
+        public void testUnwrap(int subListDepth, int filterDepth, int sortDepth)
+        {
+            List<Replica> canonUnwrap = new ArrayList<>();
+            for (Replica replica : canonicalList)
+                for (Range<Token> range : replica.range().unwrap())
+                    canonUnwrap.add(replica.decorateSubrange(range));
+            RangesAtEndpoint testUnwrap = test.unwrap();
+            if (testUnwrap == test)
+            {
+                Assert.assertEquals(canonicalList, canonUnwrap);
+            }
+            else
+            {
+                new RangesAtEndpointTestCase(testUnwrap, canonUnwrap)
+                        .testAllExceptUnwrap(subListDepth, filterDepth, sortDepth);
+            }
+        }
+
+        void testAllExceptUnwrap(int subListDepth, int filterDepth, int sortDepth)
         {
-            super.testAll();
+            super.testAll(subListDepth, filterDepth, sortDepth);
             testRanges();
         }
+
+        void testAll(int subListDepth, int filterDepth, int sortDepth)
+        {
+            testAllExceptUnwrap(subListDepth, filterDepth, sortDepth);
+            testUnwrap(subListDepth, filterDepth, sortDepth);
+        }
     }
 
     private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org