You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/26 10:14:51 UTC
cassandra git commit: Introduce RangesAtEndpoint.unwrap;
simplify StreamSession.addTransferRanges
Repository: cassandra
Updated Branches:
refs/heads/trunk 8554d6b35 -> 914c66685
Introduce RangesAtEndpoint.unwrap; simplify StreamSession.addTransferRanges
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/914c6668
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/914c6668
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/914c6668
Branch: refs/heads/trunk
Commit: 914c66685c5bebe1624d827a9b4562b73a08c297
Parents: 8554d6b
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Tue Sep 18 13:17:15 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Wed Sep 26 11:12:12 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/locator/RangesAtEndpoint.java | 31 +++++++++++++
.../cassandra/streaming/StreamSession.java | 11 +----
.../locator/ReplicaCollectionTest.java | 46 +++++++++++++++-----
4 files changed, 69 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9139822..e227c40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770)
* LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735)
* Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
* Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index f57c28e..8319d92 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -165,6 +165,37 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
return Collections.unmodifiableMap(byRange);
}
+ /**
+ * @return if there are no wrap around ranges contained in this RangesAtEndpoint, return self;
+ * otherwise, return a RangesAtEndpoint covering the same logical portions of the ring, but with those ranges unwrapped
+ */
+ public RangesAtEndpoint unwrap()
+ {
+ int wrapAroundCount = 0;
+ for (Replica replica : this)
+ {
+ if (replica.range().isWrapAround())
+ ++wrapAroundCount;
+ }
+
+ assert wrapAroundCount <= 1;
+ if (wrapAroundCount == 0)
+ return snapshot();
+
+ RangesAtEndpoint.Builder builder = builder(endpoint, size() + wrapAroundCount);
+ for (Replica replica : this)
+ {
+ if (!replica.range().isWrapAround())
+ {
+ builder.add(replica);
+ continue;
+ }
+ for (Range<Token> range : replica.range().unwrap())
+ builder.add(replica.decorateSubrange(range));
+ }
+ return builder.build();
+ }
+
public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint)
{
return collector(ImmutableSet.of(), () -> new Builder(endpoint));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index d7d0836..80fcebb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -335,15 +335,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
//Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have?
//Do we need to unwrap here also or is that just making it worse?
//Range and if it's transient
- RangesAtEndpoint.Builder unwrappedRanges = RangesAtEndpoint.builder(replicas.endpoint(), replicas.size());
- for (Replica replica : replicas)
- {
- for (Range<Token> unwrapped : replica.range().unwrap())
- {
- unwrappedRanges.add(new Replica(replica.endpoint(), unwrapped, replica.isFull()));
- }
- }
- List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges.build(), stores, pendingRepair, previewKind);
+ RangesAtEndpoint unwrappedRanges = replicas.unwrap();
+ List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges, stores, pendingRepair, previewKind);
addTransferStreams(streams);
Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace);
if (toBeUpdated == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index f937f96..c289d50 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -65,7 +66,7 @@ public class ReplicaCollectionTest
R2 = range(1, 2);
R3 = range(2, 3);
R4 = range(3, 4);
- R5 = range(4, 5);
+ R5 = range(4, 0);
BROADCAST_RANGE = range(10, 11);
NULL_RANGE = range(10000, 10001);
}
@@ -187,7 +188,7 @@ public class ReplicaCollectionTest
// remove start
// we recurse on the same subset in testSubList, so just corroborate we have the correct list here
{
- Predicate<Replica> removeFirst = r -> r != canonicalList.get(0);
+ Predicate<Replica> removeFirst = r -> !r.equals(canonicalList.get(0));
assertSubList(test.filter(removeFirst), 1, canonicalList.size());
assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2));
}
@@ -199,14 +200,14 @@ public class ReplicaCollectionTest
// we recurse on the same subset in testSubList, so just corroborate we have the correct list here
{
int last = canonicalList.size() - 1;
- Predicate<Replica> removeLast = r -> r != canonicalList.get(last);
+ Predicate<Replica> removeLast = r -> !r.equals(canonicalList.get(last));
assertSubList(test.filter(removeLast), 0, last);
}
if (test.size() <= 2)
return;
- Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
+ Predicate<Replica> removeMiddle = r -> !r.equals(canonicalList.get(canonicalList.size() / 2));
TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test)));
filtered.testAll(subListDepth, filterDepth - 1, sortDepth);
}
@@ -224,7 +225,7 @@ public class ReplicaCollectionTest
for (int i = 0 ; i < canonicalList.size() ; ++i)
{
Replica discount = canonicalList.get(i);
- Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount));
+ Assert.assertEquals(canonicalList.size() - 1, test.count(r -> !r.equals(discount)));
}
}
@@ -245,15 +246,15 @@ public class ReplicaCollectionTest
{
final Comparator<Replica> comparator = (o1, o2) ->
{
- boolean f1 = o1 == canonicalList.get(0);
- boolean f2 = o2 == canonicalList.get(0);
+ boolean f1 = o1.equals(canonicalList.get(0));
+ boolean f2 = o2.equals(canonicalList.get(0));
return f1 == f2 ? 0 : f1 ? 1 : -1;
};
TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
sorted.testAll(subListDepth, filterDepth, sortDepth - 1);
}
- private void testAll(int subListDepth, int filterDepth, int sortDepth)
+ void testAll(int subListDepth, int filterDepth, int sortDepth)
{
testEndpoints();
testOrderOfIteration();
@@ -312,12 +313,35 @@ public class ReplicaCollectionTest
Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges());
}
- @Override
- public void testAll()
+ public void testUnwrap(int subListDepth, int filterDepth, int sortDepth)
+ {
+ List<Replica> canonUnwrap = new ArrayList<>();
+ for (Replica replica : canonicalList)
+ for (Range<Token> range : replica.range().unwrap())
+ canonUnwrap.add(replica.decorateSubrange(range));
+ RangesAtEndpoint testUnwrap = test.unwrap();
+ if (testUnwrap == test)
+ {
+ Assert.assertEquals(canonicalList, canonUnwrap);
+ }
+ else
+ {
+ new RangesAtEndpointTestCase(testUnwrap, canonUnwrap)
+ .testAllExceptUnwrap(subListDepth, filterDepth, sortDepth);
+ }
+ }
+
+ void testAllExceptUnwrap(int subListDepth, int filterDepth, int sortDepth)
{
- super.testAll();
+ super.testAll(subListDepth, filterDepth, sortDepth);
testRanges();
}
+
+ void testAll(int subListDepth, int filterDepth, int sortDepth)
+ {
+ testAllExceptUnwrap(subListDepth, filterDepth, sortDepth);
+ testUnwrap(subListDepth, filterDepth, sortDepth);
+ }
}
private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org