You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/26 09:43:15 UTC
[1/2] cassandra git commit: Transient replication: range movement
improvements
Repository: cassandra
Updated Branches:
refs/heads/trunk 210da3dc0 -> 0379201c7
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 8ae6853..2f412ad 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -105,7 +105,6 @@ public class BootStrapperTest
InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1");
assertEquals(numOldNodes, tmd.sortedTokens().size());
- RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1);
IFailureDetector mockFailureDetector = new IFailureDetector()
{
public boolean isAlive(InetAddressAndPort ep)
@@ -120,26 +119,20 @@ public class BootStrapperTest
public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
};
- s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
+ RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1);
assertNotNull(Keyspace.open(keyspaceName));
s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
- Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName);
+ Multimap<InetAddressAndPort, FetchReplica> toFetch = s.toFetch().get(keyspaceName);
// Check we get get RF new ranges in total
- long rangesCount = toFetch.stream()
- .map(Multimap::values)
- .flatMap(Collection::stream)
- .map(f -> f.remote)
- .map(Replica::range)
- .count();
- assertEquals(replicationFactor, rangesCount);
+ assertEquals(replicationFactor, toFetch.size());
// there isn't any point in testing the size of these collections for any specific size. When a random partitioner
// is used, they will vary.
- assert toFetch.stream().map(Multimap::values).flatMap(Collection::stream).count() > 0;
- assert toFetch.stream().map(Multimap::keySet).map(Collection::stream).noneMatch(myEndpoint::equals);
+ assert toFetch.values().size() > 0;
+ assert toFetch.keys().stream().noneMatch(myEndpoint::equals);
return s;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
index 07d6377..cee4bb9 100644
--- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
@@ -195,18 +195,26 @@ public class RangeFetchMapCalculatorTest
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
//Return false for all except 127.0.0.5
- final Predicate<Replica> filter = replica ->
+ final RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter()
{
- try
+ public boolean apply(Replica replica)
{
- if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
- return false;
- else
+ try
+ {
+ if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
+ return false;
+ else
+ return true;
+ }
+ catch (UnknownHostException e)
+ {
return true;
+ }
}
- catch (UnknownHostException e)
+
+ public String message(Replica replica)
{
- return true;
+ return "Doesn't match 127.0.0.5";
}
};
@@ -230,7 +238,18 @@ public class RangeFetchMapCalculatorTest
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
- final Predicate<Replica> allDeadFilter = replica -> false;
+ final RangeStreamer.SourceFilter allDeadFilter = new RangeStreamer.SourceFilter()
+ {
+ public boolean apply(Replica replica)
+ {
+ return false;
+ }
+
+ public String message(Replica replica)
+ {
+ return "All dead";
+ }
+ };
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test");
calculator.getRangeFetchMap();
@@ -263,18 +282,26 @@ public class RangeFetchMapCalculatorTest
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
//Reject only 127.0.0.3 and accept everyone else
- final Predicate<Replica> localHostFilter = replica ->
+ final RangeStreamer.SourceFilter localHostFilter = new RangeStreamer.SourceFilter()
{
- try
+ public boolean apply(Replica replica)
{
- if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
- return false;
- else
+ try
+ {
+ if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+ return false;
+ else
+ return true;
+ }
+ catch (UnknownHostException e)
+ {
return true;
+ }
}
- catch (UnknownHostException e)
+
+ public String message(Replica replica)
{
- return true;
+ return "Not 127.0.0.3";
}
};
@@ -318,18 +345,26 @@ public class RangeFetchMapCalculatorTest
// and a trivial one:
addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
- Predicate<Replica> filter = replica ->
+ RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter()
{
- try
+ public boolean apply(Replica replica)
{
- if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
- return false;
+ try
+ {
+ if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+ return false;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return true;
}
- catch (UnknownHostException e)
+
+ public String message(Replica replica)
{
- throw new RuntimeException(e);
+ return "Not 127.0.0.3";
}
- return true;
};
RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test");
Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
index 4afeb5a..23d585f 100644
--- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.RangeRelocator;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
@@ -368,7 +369,7 @@ public class OldNetworkTopologyStrategyTest
RangesAtEndpoint currentRanges = strategy.getAddressReplicas().get(movingNode);
RangesAtEndpoint updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode);
- return asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, updatedRanges));
+ return asRanges(RangeRelocator.calculateStreamAndFetchRanges(currentRanges, updatedRanges));
}
private static Map<String, String> optsWithRF(int rf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
index 63973ea..0ee1f81 100644
--- a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -47,6 +48,7 @@ import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.locator.Replica.fullReplica;
@@ -60,29 +62,59 @@ import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqua
*/
public class BootstrapTransientTest
{
- static InetAddressAndPort aAddress;
- static InetAddressAndPort bAddress;
- static InetAddressAndPort cAddress;
- static InetAddressAndPort dAddress;
+ static InetAddressAndPort address02;
+ static InetAddressAndPort address03;
+ static InetAddressAndPort address04;
+ static InetAddressAndPort address05;
@BeforeClass
public static void setUpClass() throws Exception
{
- aAddress = InetAddressAndPort.getByName("127.0.0.1");
- bAddress = InetAddressAndPort.getByName("127.0.0.2");
- cAddress = InetAddressAndPort.getByName("127.0.0.3");
- dAddress = InetAddressAndPort.getByName("127.0.0.4");
+ address02 = InetAddressAndPort.getByName("127.0.0.2");
+ address03 = InetAddressAndPort.getByName("127.0.0.3");
+ address04 = InetAddressAndPort.getByName("127.0.0.4");
+ address05 = InetAddressAndPort.getByName("127.0.0.5");
}
private final List<InetAddressAndPort> downNodes = new ArrayList<>();
- Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint());
+
+ final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter()
+ {
+ public boolean apply(Replica replica)
+ {
+ return !downNodes.contains(replica.endpoint());
+ }
+
+ public String message(Replica replica)
+ {
+ return "Down nodes: " + downNodes;
+ }
+ };
+
+ final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter()
+ {
+ public boolean apply(Replica replica)
+ {
+ return !sourceFilterDownNodes.contains(replica.endpoint());
+ }
+
+ public String message(Replica replica)
+ {
+ return "Source filter down nodes" + sourceFilterDownNodes;
+ }
+ };
private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
- private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint()));
+
+ private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate,
+ sourceFilterDownNodesPredicate,
+ new RangeStreamer.ExcludeLocalNodeFilter()
+ );
@After
public void clearDownNode()
{
+ // TODO: actually use these
downNodes.clear();
sourceFilterDownNodes.clear();
}
@@ -93,27 +125,43 @@ public class BootstrapTransientTest
DatabaseDescriptor.daemonInitialization();
}
- Token tenToken = new OrderPreservingPartitioner.StringToken("00010");
+ Token tenToken = new OrderPreservingPartitioner.StringToken("00010");
Token twentyToken = new OrderPreservingPartitioner.StringToken("00020");
Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030");
Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040");
- Range<Token> aRange = new Range<>(thirtyToken, tenToken);
- Range<Token> bRange = new Range<>(tenToken, twentyToken);
- Range<Token> cRange = new Range<>(twentyToken, thirtyToken);
- Range<Token> dRange = new Range<>(thirtyToken, fourtyToken);
+ Range<Token> range30_10 = new Range<>(thirtyToken, tenToken);
+ Range<Token> range10_20 = new Range<>(tenToken, twentyToken);
+ Range<Token> range20_30 = new Range<>(twentyToken, thirtyToken);
+ Range<Token> range30_40 = new Range<>(thirtyToken, fourtyToken);
+
+ RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(address05, range30_40, true),
+ new Replica(address05, range20_30, true),
+ new Replica(address05, range10_20, false));
- RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(dAddress, dRange, true),
- new Replica(dAddress, cRange, true),
- new Replica(dAddress, bRange, false));
+
+ public EndpointsForRange endpoints(Replica... replicas)
+ {
+ assert replicas.length > 0;
+
+ Range<Token> range = replicas[0].range();
+ EndpointsForRange.Builder builder = EndpointsForRange.builder(range);
+ for (Replica r : replicas)
+ {
+ assert r.range().equals(range);
+ builder.add(r);
+ }
+
+ return builder.build();
+ }
@Test
public void testRangeStreamerRangesToFetch() throws Exception
{
EndpointsByReplica expectedResult = new EndpointsByReplica(ImmutableMap.of(
- fullReplica(dAddress, dRange), EndpointsForRange.builder(aRange).add(fullReplica(bAddress, aRange)).add(transientReplica(cAddress, aRange)).build(),
- fullReplica(dAddress, cRange), EndpointsForRange.builder(cRange).add(fullReplica(cAddress, cRange)).add(transientReplica(bAddress, cRange)).build(),
- transientReplica(dAddress, bRange), EndpointsForRange.builder(bRange).add(transientReplica(aAddress, bRange)).build()));
+ transientReplica(address05, range10_20), endpoints(transientReplica(address02, range10_20)),
+ fullReplica(address05, range20_30), endpoints(transientReplica(address03, range20_30), fullReplica(address04, range20_30)),
+ fullReplica(address05, range30_40), endpoints(transientReplica(address04, range30_10), fullReplica(address02, range30_10))));
invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, constructTMDs(), expectedResult);
}
@@ -121,11 +169,11 @@ public class BootstrapTransientTest
private Pair<TokenMetadata, TokenMetadata> constructTMDs()
{
TokenMetadata tmd = new TokenMetadata();
- tmd.updateNormalToken(aRange.right, aAddress);
- tmd.updateNormalToken(bRange.right, bAddress);
- tmd.updateNormalToken(cRange.right, cAddress);
+ tmd.updateNormalToken(range30_10.right, address02);
+ tmd.updateNormalToken(range10_20.right, address03);
+ tmd.updateNormalToken(range20_30.right, address04);
TokenMetadata updated = tmd.cloneOnlyTokenMap();
- updated.updateNormalToken(dRange.right, dAddress);
+ updated.updateNormalToken(range30_40.right, address05);
return Pair.create(tmd, updated);
}
@@ -137,14 +185,13 @@ public class BootstrapTransientTest
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas,
- simpleStrategy(tmds.left),
- toFetch,
- true,
- tmds.left,
- tmds.right,
- alivePredicate,
- "OldNetworkTopologyStrategyTest",
- sourceFilters);
+ simpleStrategy(tmds.left),
+ toFetch,
+ true,
+ tmds.left,
+ tmds.right,
+ "OldNetworkTopologyStrategyTest",
+ sourceFilters);
result.asMap().forEach((replica, list) -> System.out.printf("Replica %s, sources %s%n", replica, list));
assertMultimapEqualsIgnoreOrder(expectedResult, result);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/MoveTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
index 1e24735..e5a63c7 100644
--- a/test/unit/org/apache/cassandra/service/MoveTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.service;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.locator.EndpointsByReplica;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.RangesByEndpoint;
@@ -64,27 +65,56 @@ public class MoveTransientTest
{
private static final Logger logger = LoggerFactory.getLogger(MoveTransientTest.class);
- static InetAddressAndPort aAddress;
- static InetAddressAndPort bAddress;
- static InetAddressAndPort cAddress;
- static InetAddressAndPort dAddress;
- static InetAddressAndPort eAddress;
+ static InetAddressAndPort address01;
+ static InetAddressAndPort address02;
+ static InetAddressAndPort address03;
+ static InetAddressAndPort address04;
+ static InetAddressAndPort address05;
@BeforeClass
public static void setUpClass() throws Exception
{
- aAddress = InetAddressAndPort.getByName("127.0.0.1");
- bAddress = InetAddressAndPort.getByName("127.0.0.2");
- cAddress = InetAddressAndPort.getByName("127.0.0.3");
- dAddress = InetAddressAndPort.getByName("127.0.0.4");
- eAddress = InetAddressAndPort.getByName("127.0.0.5");
+ address01 = InetAddressAndPort.getByName("127.0.0.1");
+ address02 = InetAddressAndPort.getByName("127.0.0.2");
+ address03 = InetAddressAndPort.getByName("127.0.0.3");
+ address04 = InetAddressAndPort.getByName("127.0.0.4");
+ address05 = InetAddressAndPort.getByName("127.0.0.5");
}
- private final List<InetAddressAndPort> downNodes = new ArrayList();
- Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint());
+ private final List<InetAddressAndPort> downNodes = new ArrayList<>();
+
+ final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter()
+ {
+ public boolean apply(Replica replica)
+ {
+ return !downNodes.contains(replica.endpoint());
+ }
+
+ public String message(Replica replica)
+ {
+ return "Down nodes: " + downNodes;
+ }
+ };
+
+ final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter()
+ {
+ public boolean apply(Replica replica)
+ {
+ return !sourceFilterDownNodes.contains(replica.endpoint());
+ }
+
+ public String message(Replica replica)
+ {
+ return "Source filter down nodes: " + sourceFilterDownNodes;
+ }
+ };
private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
- private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint()));
+
+ private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate,
+ sourceFilterDownNodesPredicate,
+ new RangeStreamer.ExcludeLocalNodeFilter()
+ );
@After
public void clearDownNode()
@@ -99,27 +129,36 @@ public class MoveTransientTest
DatabaseDescriptor.daemonInitialization();
}
- Token oneToken = new RandomPartitioner.BigIntegerToken("1");
- Token twoToken = new RandomPartitioner.BigIntegerToken("2");
- Token threeToken = new RandomPartitioner.BigIntegerToken("3");
- Token fourToken = new RandomPartitioner.BigIntegerToken("4");
- Token sixToken = new RandomPartitioner.BigIntegerToken("6");
- Token sevenToken = new RandomPartitioner.BigIntegerToken("7");
- Token nineToken = new RandomPartitioner.BigIntegerToken("9");
- Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
- Token fourteenToken = new RandomPartitioner.BigIntegerToken("14");
+ final Token oneToken = new RandomPartitioner.BigIntegerToken("1");
+ final Token twoToken = new RandomPartitioner.BigIntegerToken("2");
+ final Token threeToken = new RandomPartitioner.BigIntegerToken("3");
+ final Token fourToken = new RandomPartitioner.BigIntegerToken("4");
+ final Token sixToken = new RandomPartitioner.BigIntegerToken("6");
+ final Token sevenToken = new RandomPartitioner.BigIntegerToken("7");
+ final Token nineToken = new RandomPartitioner.BigIntegerToken("9");
+ final Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
+ final Token fourteenToken = new RandomPartitioner.BigIntegerToken("14");
- Range<Token> aRange = new Range(oneToken, threeToken);
- Range<Token> bRange = new Range(threeToken, sixToken);
- Range<Token> cRange = new Range(sixToken, nineToken);
- Range<Token> dRange = new Range(nineToken, elevenToken);
- Range<Token> eRange = new Range(elevenToken, oneToken);
+ final Range<Token> range_1_2 = new Range(oneToken, threeToken);
+ final Range<Token> range_3_6 = new Range(threeToken, sixToken);
+ final Range<Token> range_6_9 = new Range(sixToken, nineToken);
+ final Range<Token> range_9_11 = new Range(nineToken, elevenToken);
+ final Range<Token> range_11_1 = new Range(elevenToken, oneToken);
- RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(aAddress, aRange, true),
- new Replica(aAddress, eRange, true),
- new Replica(aAddress, dRange, false));
+ final RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(address01, range_1_2, true),
+ new Replica(address01, range_11_1, true),
+ new Replica(address01, range_9_11, false));
+
+ public Token token(String s)
+ {
+ return new RandomPartitioner.BigIntegerToken(s);
+ }
+ public Range<Token> range(String start, String end)
+ {
+ return new Range<>(token(start), token(end));
+ }
/**
* Ring with start A 1-3 B 3-6 C 6-9 D 9-1
@@ -140,14 +179,14 @@ public class MoveTransientTest
Range<Token> aPrimeRange = new Range<>(oneToken, fourToken);
RangesAtEndpoint updated = RangesAtEndpoint.of(
- new Replica(aAddress, aPrimeRange, true),
- new Replica(aAddress, eRange, true),
- new Replica(aAddress, dRange, false)
+ new Replica(address01, aPrimeRange, true),
+ new Replica(address01, range_11_1, true),
+ new Replica(address01, range_9_11, false)
);
- Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
assertContentsIgnoreOrder(result.left);
- assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, fourToken));
+ assertContentsIgnoreOrder(result.right, fullReplica(address01, threeToken, fourToken));
return result;
}
@@ -170,15 +209,15 @@ public class MoveTransientTest
Range<Token> aPrimeRange = new Range<>(elevenToken, fourteenToken);
RangesAtEndpoint updated = RangesAtEndpoint.of(
- new Replica(aAddress, aPrimeRange, true),
- new Replica(aAddress, dRange, true),
- new Replica(aAddress, cRange, false)
+ new Replica(address01, aPrimeRange, true),
+ new Replica(address01, range_9_11, true),
+ new Replica(address01, range_6_9, false)
);
- Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
- assertContentsIgnoreOrder(result.left, fullReplica(aAddress, oneToken, threeToken), fullReplica(aAddress, fourteenToken, oneToken));
- assertContentsIgnoreOrder(result.right, transientReplica(aAddress, sixToken, nineToken), fullReplica(aAddress, nineToken, elevenToken));
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
+ assertContentsIgnoreOrder(result.left, fullReplica(address01, oneToken, threeToken), fullReplica(address01, fourteenToken, oneToken));
+ assertContentsIgnoreOrder(result.right, transientReplica(address01, sixToken, nineToken), fullReplica(address01, nineToken, elevenToken));
return result;
}
@@ -200,16 +239,16 @@ public class MoveTransientTest
Range<Token> aPrimeRange = new Range<>(oneToken, twoToken);
RangesAtEndpoint updated = RangesAtEndpoint.of(
- new Replica(aAddress, aPrimeRange, true),
- new Replica(aAddress, eRange, true),
- new Replica(aAddress, dRange, false)
+ new Replica(address01, aPrimeRange, true),
+ new Replica(address01, range_11_1, true),
+ new Replica(address01, range_9_11, false)
);
- Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
//Moving backwards has no impact on any replica. We already fully replicate counter clockwise
//The transient replica does transiently replicate slightly more, but that is addressed by cleanup
- assertContentsIgnoreOrder(result.left, fullReplica(aAddress, twoToken, threeToken));
+ assertContentsIgnoreOrder(result.left, fullReplica(address01, twoToken, threeToken));
assertContentsIgnoreOrder(result.right);
return result;
@@ -226,17 +265,16 @@ public class MoveTransientTest
Range<Token> aPrimeRange = new Range<>(sixToken, sevenToken);
Range<Token> bPrimeRange = new Range<>(oneToken, sixToken);
-
RangesAtEndpoint updated = RangesAtEndpoint.of(
- new Replica(aAddress, aPrimeRange, true),
- new Replica(aAddress, bPrimeRange, true),
- new Replica(aAddress, eRange, false)
+ new Replica(address01, aPrimeRange, true),
+ new Replica(address01, bPrimeRange, true),
+ new Replica(address01, range_11_1, false)
);
- Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
- assertContentsIgnoreOrder(result.left, fullReplica(aAddress, elevenToken, oneToken), transientReplica(aAddress, nineToken, elevenToken));
- assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, sixToken), fullReplica(aAddress, sixToken, sevenToken));
+ assertContentsIgnoreOrder(result.left, fullReplica(address01, elevenToken, oneToken), transientReplica(address01, nineToken, elevenToken));
+ assertContentsIgnoreOrder(result.right, fullReplica(address01, threeToken, sixToken), fullReplica(address01, sixToken, sevenToken));
return result;
}
@@ -252,6 +290,37 @@ public class MoveTransientTest
calculateStreamAndFetchRangesMoveForwardBetween();
}
+ @Test
+ public void testResubtract()
+ {
+ Token oneToken = new RandomPartitioner.BigIntegerToken("0001");
+ Token tenToken = new RandomPartitioner.BigIntegerToken("0010");
+ Token fiveToken = new RandomPartitioner.BigIntegerToken("0005");
+
+ Range<Token> range_1_10 = new Range<>(oneToken, tenToken);
+ Range<Token> range_1_5 = new Range<>(oneToken, tenToken);
+ Range<Token> range_5_10 = new Range<>(fiveToken, tenToken);
+
+ RangesAtEndpoint singleRange = RangesAtEndpoint.of(
+ new Replica(address01, range_1_10, true)
+ );
+
+ RangesAtEndpoint splitRanges = RangesAtEndpoint.of(
+ new Replica(address01, range_1_5, true),
+ new Replica(address01, range_5_10, true)
+ );
+
+ // forward
+ Pair<RangesAtEndpoint, RangesAtEndpoint> calculated = RangeRelocator.calculateStreamAndFetchRanges(singleRange, splitRanges);
+ assertTrue(calculated.left.toString(), calculated.left.isEmpty());
+ assertTrue(calculated.right.toString(), calculated.right.isEmpty());
+
+ // backward
+ calculated = RangeRelocator.calculateStreamAndFetchRanges(splitRanges, singleRange);
+ assertTrue(calculated.left.toString(), calculated.left.isEmpty());
+ assertTrue(calculated.right.toString(), calculated.right.isEmpty());
+ }
+
/**
* Construct the ring state for calculateStreamAndFetchRangesMoveBackwardBetween
* Where are A moves from 3 to 14
@@ -260,12 +329,12 @@ public class MoveTransientTest
private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackwardBetween()
{
TokenMetadata tmd = new TokenMetadata();
- tmd.updateNormalToken(aRange.right, aAddress);
- tmd.updateNormalToken(bRange.right, bAddress);
- tmd.updateNormalToken(cRange.right, cAddress);
- tmd.updateNormalToken(dRange.right, dAddress);
- tmd.updateNormalToken(eRange.right, eAddress);
- tmd.addMovingEndpoint(fourteenToken, aAddress);
+ tmd.updateNormalToken(range_1_2.right, address01);
+ tmd.updateNormalToken(range_3_6.right, address02);
+ tmd.updateNormalToken(range_6_9.right, address03);
+ tmd.updateNormalToken(range_9_11.right, address04);
+ tmd.updateNormalToken(range_11_1.right, address05);
+ tmd.addMovingEndpoint(fourteenToken, address01);
TokenMetadata updated = tmd.cloneAfterAllSettled();
return Pair.create(tmd, updated);
@@ -280,12 +349,12 @@ public class MoveTransientTest
private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForwardBetween()
{
TokenMetadata tmd = new TokenMetadata();
- tmd.updateNormalToken(aRange.right, aAddress);
- tmd.updateNormalToken(bRange.right, bAddress);
- tmd.updateNormalToken(cRange.right, cAddress);
- tmd.updateNormalToken(dRange.right, dAddress);
- tmd.updateNormalToken(eRange.right, eAddress);
- tmd.addMovingEndpoint(sevenToken, aAddress);
+ tmd.updateNormalToken(range_1_2.right, address01);
+ tmd.updateNormalToken(range_3_6.right, address02);
+ tmd.updateNormalToken(range_6_9.right, address03);
+ tmd.updateNormalToken(range_9_11.right, address04);
+ tmd.updateNormalToken(range_11_1.right, address05);
+ tmd.addMovingEndpoint(sevenToken, address01);
TokenMetadata updated = tmd.cloneAfterAllSettled();
return Pair.create(tmd, updated);
@@ -294,12 +363,12 @@ public class MoveTransientTest
private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackward()
{
TokenMetadata tmd = new TokenMetadata();
- tmd.updateNormalToken(aRange.right, aAddress);
- tmd.updateNormalToken(bRange.right, bAddress);
- tmd.updateNormalToken(cRange.right, cAddress);
- tmd.updateNormalToken(dRange.right, dAddress);
- tmd.updateNormalToken(eRange.right, eAddress);
- tmd.addMovingEndpoint(twoToken, aAddress);
+ tmd.updateNormalToken(range_1_2.right, address01);
+ tmd.updateNormalToken(range_3_6.right, address02);
+ tmd.updateNormalToken(range_6_9.right, address03);
+ tmd.updateNormalToken(range_9_11.right, address04);
+ tmd.updateNormalToken(range_11_1.right, address05);
+ tmd.addMovingEndpoint(twoToken, address01);
TokenMetadata updated = tmd.cloneAfterAllSettled();
return Pair.create(tmd, updated);
@@ -308,12 +377,12 @@ public class MoveTransientTest
private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForward()
{
TokenMetadata tmd = new TokenMetadata();
- tmd.updateNormalToken(aRange.right, aAddress);
- tmd.updateNormalToken(bRange.right, bAddress);
- tmd.updateNormalToken(cRange.right, cAddress);
- tmd.updateNormalToken(dRange.right, dAddress);
- tmd.updateNormalToken(eRange.right, eAddress);
- tmd.addMovingEndpoint(fourToken, aAddress);
+ tmd.updateNormalToken(range_1_2.right, address01);
+ tmd.updateNormalToken(range_3_6.right, address02);
+ tmd.updateNormalToken(range_6_9.right, address03);
+ tmd.updateNormalToken(range_9_11.right, address04);
+ tmd.updateNormalToken(range_11_1.right, address05);
+ tmd.addMovingEndpoint(fourToken, address01);
TokenMetadata updated = tmd.cloneAfterAllSettled();
return Pair.create(tmd, updated);
@@ -325,15 +394,15 @@ public class MoveTransientTest
{
EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
- InetAddressAndPort cOrB = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+ InetAddressAndPort cOrB = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03;
//Need to pull the full replica and the transient replica that is losing the range
- expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), fullReplica(dAddress, sixToken, nineToken));
- expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), transientReplica(eAddress, sixToken, nineToken));
+ expectedResult.put(fullReplica(address01, sixToken, sevenToken), fullReplica(address04, sixToken, nineToken));
+ expectedResult.put(fullReplica(address01, sixToken, sevenToken), transientReplica(address05, sixToken, nineToken));
//Same need both here as well
- expectedResult.put(fullReplica(aAddress, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken));
- expectedResult.put(fullReplica(aAddress, threeToken, sixToken), transientReplica(dAddress, threeToken, sixToken));
+ expectedResult.put(fullReplica(address01, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken));
+ expectedResult.put(fullReplica(address01, threeToken, sixToken), transientReplica(address04, threeToken, sixToken));
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right,
constructTMDsMoveForwardBetween(),
@@ -343,7 +412,7 @@ public class MoveTransientTest
@Test
public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
{
- for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress})
+ for (InetAddressAndPort downNode : new InetAddressAndPort[] { address04, address05 })
{
downNodes.clear();
downNodes.add(downNode);
@@ -356,8 +425,7 @@ public class MoveTransientTest
{
ise.printStackTrace();
assertTrue(downNode.toString(),
- ise.getMessage().startsWith("A node required to move the data consistently is down:")
- && ise.getMessage().contains(downNode.toString()));
+ ise.getMessage().contains("Down nodes: [" + downNode + "]"));
threw = true;
}
assertTrue("Didn't throw for " + downNode, threw);
@@ -365,14 +433,14 @@ public class MoveTransientTest
//Shouldn't throw because another full replica is available
downNodes.clear();
- downNodes.add(cAddress);
+ downNodes.add(address03);
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
}
@Test
public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
{
- for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress})
+ for (InetAddressAndPort downNode : new InetAddressAndPort[] { address04, address05 })
{
sourceFilterDownNodes.clear();
sourceFilterDownNodes.add(downNode);
@@ -394,7 +462,7 @@ public class MoveTransientTest
//Shouldn't throw because another full replica is available
sourceFilterDownNodes.clear();
- sourceFilterDownNodes.add(cAddress);
+ sourceFilterDownNodes.add(address03);
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
}
@@ -404,8 +472,8 @@ public class MoveTransientTest
EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
//Need to pull the full replica and the transient replica that is losing the range
- expectedResult.put(fullReplica(aAddress, nineToken, elevenToken), fullReplica(eAddress, nineToken, elevenToken));
- expectedResult.put(transientReplica(aAddress, sixToken, nineToken), transientReplica(eAddress, sixToken, nineToken));
+ expectedResult.put(fullReplica(address01, nineToken, elevenToken), fullReplica(address05, nineToken, elevenToken));
+ expectedResult.put(transientReplica(address01, sixToken, nineToken), transientReplica(address05, sixToken, nineToken));
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right,
constructTMDsMoveBackwardBetween(),
@@ -417,7 +485,7 @@ public class MoveTransientTest
public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
{
//Any replica can be the full replica so this will always fail on the transient range
- downNodes.add(eAddress);
+ downNodes.add(address05);
testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
}
@@ -425,7 +493,7 @@ public class MoveTransientTest
public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
{
//Any replica can be the full replica so this will always fail on the transient range
- sourceFilterDownNodes.add(eAddress);
+ sourceFilterDownNodes.add(address05);
testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
}
@@ -448,11 +516,11 @@ public class MoveTransientTest
{
EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
- InetAddressAndPort cOrBAddress = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+ InetAddressAndPort cOrBAddress = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03;
//Need to pull the full replica and the transient replica that is losing the range
- expectedResult.put(fullReplica(aAddress, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken));
- expectedResult.put(fullReplica(aAddress, threeToken, fourToken), transientReplica(dAddress, threeToken, sixToken));
+ expectedResult.put(fullReplica(address01, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken));
+ expectedResult.put(fullReplica(address01, threeToken, fourToken), transientReplica(address04, threeToken, sixToken));
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right,
constructTMDsMoveForward(),
@@ -463,7 +531,7 @@ public class MoveTransientTest
@Test
public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
{
- downNodes.add(dAddress);
+ downNodes.add(address04);
boolean threw = false;
try
{
@@ -472,23 +540,22 @@ public class MoveTransientTest
catch (IllegalStateException ise)
{
ise.printStackTrace();
- assertTrue(dAddress.toString(),
- ise.getMessage().startsWith("A node required to move the data consistently is down:")
- && ise.getMessage().contains(dAddress.toString()));
+ assertTrue(address04.toString(),
+ ise.getMessage().contains("Down nodes: [" + address04 + "]"));
threw = true;
}
- assertTrue("Didn't throw for " + dAddress, threw);
+ assertTrue("Didn't throw for " + address04, threw);
//Shouldn't throw because another full replica is available
downNodes.clear();
- downNodes.add(cAddress);
+ downNodes.add(address03);
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
}
@Test
public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
{
- sourceFilterDownNodes.add(dAddress);
+ sourceFilterDownNodes.add(address04);
boolean threw = false;
try
{
@@ -497,16 +564,16 @@ public class MoveTransientTest
catch (IllegalStateException ise)
{
ise.printStackTrace();
- assertTrue(dAddress.toString(),
+ assertTrue(address04.toString(),
ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:")
- && ise.getMessage().contains(dAddress.toString()));
+ && ise.getMessage().contains(address04.toString()));
threw = true;
}
- assertTrue("Didn't throw for " + dAddress, threw);
+ assertTrue("Didn't throw for " + address04, threw);
//Shouldn't throw because another full replica is available
sourceFilterDownNodes.clear();
- sourceFilterDownNodes.add(cAddress);
+ sourceFilterDownNodes.add(address03);
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
}
@@ -517,18 +584,16 @@ public class MoveTransientTest
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas.sorted((a, b) -> b.endpoint().compareTo(a.endpoint())),
- simpleStrategy(tmds.left),
- toFetch,
- true,
- tmds.left,
- tmds.right,
- alivePredicate,
- "OldNetworkTopologyStrategyTest",
- sourceFilters);
+ simpleStrategy(tmds.left),
+ toFetch,
+ true,
+ tmds.left,
+ tmds.right,
+ "OldNetworkTopologyStrategyTest",
+ sourceFilters);
logger.info("Ranges to fetch with preferred endpoints");
logger.info(result.toString());
assertMultimapEqualsIgnoreOrder(expectedResult, result);
-
}
private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
@@ -564,8 +629,8 @@ public class MoveTransientTest
RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
//Need to pull the full replica and the transient replica that is losing the range
- expectedResult.put(bAddress, transientReplica(bAddress, nineToken, elevenToken));
- expectedResult.put(bAddress, fullReplica(bAddress, elevenToken, oneToken));
+ expectedResult.put(address02, transientReplica(address02, nineToken, elevenToken));
+ expectedResult.put(address02, fullReplica(address02, elevenToken, oneToken));
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left,
constructTMDsMoveForwardBetween(),
@@ -577,12 +642,12 @@ public class MoveTransientTest
{
RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
- expectedResult.put(bAddress, fullReplica(bAddress, fourteenToken, oneToken));
+ expectedResult.put(address02, fullReplica(address02, fourteenToken, oneToken));
- expectedResult.put(dAddress, transientReplica(dAddress, oneToken, threeToken));
+ expectedResult.put(address04, transientReplica(address04, oneToken, threeToken));
- expectedResult.put(cAddress, fullReplica(cAddress, oneToken, threeToken));
- expectedResult.put(cAddress, transientReplica(cAddress, fourteenToken, oneToken));
+ expectedResult.put(address03, fullReplica(address03, oneToken, threeToken));
+ expectedResult.put(address03, transientReplica(address03, fourteenToken, oneToken));
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left,
constructTMDsMoveBackwardBetween(),
@@ -593,8 +658,8 @@ public class MoveTransientTest
public void testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception
{
RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
- expectedResult.put(cAddress, fullReplica(cAddress, twoToken, threeToken));
- expectedResult.put(dAddress, transientReplica(dAddress, twoToken, threeToken));
+ expectedResult.put(address03, fullReplica(address03, twoToken, threeToken));
+ expectedResult.put(address04, transientReplica(address04, twoToken, threeToken));
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left,
constructTMDsMoveBackward(),
@@ -617,7 +682,7 @@ public class MoveTransientTest
RangesByEndpoint expectedResult)
{
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
- StorageService.RangeRelocator relocator = new StorageService.RangeRelocator();
+ RangeRelocator relocator = new RangeRelocator();
RangesByEndpoint result = relocator.calculateRangesToStreamWithEndpoints(toStream,
simpleStrategy(tmds.left),
tmds.left,
@@ -631,8 +696,10 @@ public class MoveTransientTest
{
assertEquals(ranges.size(), replicas.length);
for (Replica replica : replicas)
+ {
if (!ranges.contains(replica))
- assertEquals(RangesAtEndpoint.of(replicas), ranges);
+ assertTrue(Iterables.elementsEqual(RangesAtEndpoint.of(replicas), ranges));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/StorageServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
index 9d5c324..cc7fac3 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class StorageServiceTest
{
@@ -106,21 +107,32 @@ public class StorageServiceTest
public static <K, C extends ReplicaCollection<? extends C>> void assertMultimapEqualsIgnoreOrder(ReplicaMultimap<K, C> a, ReplicaMultimap<K, C> b)
{
if (!a.keySet().equals(b.keySet()))
- assertEquals(a, b);
+ fail(formatNeq(a, b));
for (K key : a.keySet())
{
C ac = a.get(key);
C bc = b.get(key);
if (ac.size() != bc.size())
- assertEquals(a, b);
+ fail(formatNeq(a, b));
for (Replica r : ac)
{
if (!bc.contains(r))
- assertEquals(a, b);
+ fail(formatNeq(a, b));
}
}
}
+ public static String formatNeq(Object v1, Object v2)
+ {
+ return "\nExpected: " + formatClassAndValue(v1) + "\n but was: " + formatClassAndValue(v2);
+ }
+
+ public static String formatClassAndValue(Object value)
+ {
+ String className = value == null ? "null" : value.getClass().getName();
+ return className + "<" + String.valueOf(value) + ">";
+ }
+
@Test
public void testGetChangedReplicasForLeaving() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/2] cassandra git commit: Transient replication: range movement
improvements
Posted by if...@apache.org.
Transient replication: range movement improvements
Patch by Alex Petrov; reviewed by Ariel Weisberg and Benedict Elliott Smith for CASSANDRA-14756
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0379201c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0379201c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0379201c
Branch: refs/heads/trunk
Commit: 0379201c7057f6bac4abf1e0f3d81a12d90abd08
Parents: 210da3d
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Sep 17 11:51:56 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 26 11:42:46 2018 +0200
----------------------------------------------------------------------
.../org/apache/cassandra/db/SystemKeyspace.java | 31 +-
.../org/apache/cassandra/dht/BootStrapper.java | 3 -
.../cassandra/dht/RangeFetchMapCalculator.java | 2 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 448 ++++++++++---------
.../apache/cassandra/dht/StreamStateStore.java | 12 +-
.../cassandra/locator/RangesAtEndpoint.java | 6 +
.../cassandra/service/RangeRelocator.java | 324 ++++++++++++++
.../cassandra/service/StorageService.java | 314 +------------
.../apache/cassandra/streaming/StreamPlan.java | 17 +-
.../cassandra/streaming/StreamSession.java | 8 +-
.../apache/cassandra/dht/BootStrapperTest.java | 17 +-
.../dht/RangeFetchMapCalculatorTest.java | 79 +++-
.../locator/OldNetworkTopologyStrategyTest.java | 3 +-
.../service/BootstrapTransientTest.java | 113 +++--
.../cassandra/service/MoveTransientTest.java | 321 +++++++------
.../cassandra/service/StorageServiceTest.java | 18 +-
16 files changed, 981 insertions(+), 735 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index ff070a3..0f904ce 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -32,12 +32,11 @@ import javax.management.openmbean.TabularData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1285,24 +1284,40 @@ public final class SystemKeyspace
keyspace);
}
- public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
+ /**
+ * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from.
+ */
+ public static synchronized AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
{
String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace);
- InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost();
- RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+
+ ImmutableSet.Builder<Range<Token>> full = new ImmutableSet.Builder<>();
+ ImmutableSet.Builder<Range<Token>> trans = new ImmutableSet.Builder<>();
for (UntypedResultSet.Row row : rs)
{
Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
.ifPresent(full_ranges -> full_ranges.stream()
.map(buf -> byteBufferToRange(buf, partitioner))
- .forEach(range -> builder.add(fullReplica(endpoint, range))));
+ .forEach(full::add));
Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance))
.ifPresent(transient_ranges -> transient_ranges.stream()
.map(buf -> byteBufferToRange(buf, partitioner))
- .forEach(range -> builder.add(transientReplica(endpoint, range))));
+ .forEach(trans::add));
+ }
+ return new AvailableRanges(full.build(), trans.build());
+ }
+
+ public static class AvailableRanges
+ {
+ public Set<Range<Token>> full;
+ public Set<Range<Token>> trans;
+
+ private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> trans)
+ {
+ this.full = full;
+ this.trans = trans;
}
- return builder.build();
}
public static void resetAvailableRanges()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 92bf8c8..cef605e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -79,9 +79,6 @@ public class BootStrapper extends ProgressEventNotifierSupport
stateStore,
true,
DatabaseDescriptor.getStreamingConnectionsPerHost());
- streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
- streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
-
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 4b98b97..63265b7 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -84,7 +84,7 @@ public class RangeFetchMapCalculator
private final Set<Range<Token>> trivialRanges;
public RangeFetchMapCalculator(EndpointsByRange rangesWithSources,
- Collection<Predicate<Replica>> sourceFilters,
+ Collection<RangeStreamer.SourceFilter> sourceFilters,
String keyspace)
{
this.rangesWithSources = rangesWithSources;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index e8aa5d3..f46d665 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -27,9 +27,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsByReplica;
@@ -53,12 +53,12 @@ import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
@@ -87,8 +87,8 @@ public class RangeStreamer
private final InetAddressAndPort address;
/* streaming description */
private final String description;
- private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create();
- private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
+ private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>();
+ private final List<SourceFilter> sourceFilters = new ArrayList<>();
private final StreamPlan streamPlan;
private final boolean useStrictConsistency;
private final IEndpointSnitch snitch;
@@ -97,6 +97,7 @@ public class RangeStreamer
public static class FetchReplica
{
public final Replica local;
+ // Source replica
public final Replica remote;
public FetchReplica(Replica local, Replica remote)
@@ -135,11 +136,17 @@ public class RangeStreamer
}
}
+ public interface SourceFilter extends Predicate<Replica>
+ {
+ public boolean apply(Replica replica);
+ public String message(Replica replica);
+ }
+
/**
* Source filter which excludes any endpoints that are not alive according to a
* failure detector.
*/
- public static class FailureDetectorSourceFilter implements Predicate<Replica>
+ public static class FailureDetectorSourceFilter implements SourceFilter
{
private final IFailureDetector fd;
@@ -148,16 +155,23 @@ public class RangeStreamer
this.fd = fd;
}
+ @Override
public boolean apply(Replica replica)
{
return fd.isAlive(replica.endpoint());
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it was down";
+ }
}
/**
* Source filter which excludes any endpoints that are not in a specific data center.
*/
- public static class SingleDatacenterFilter implements Predicate<Replica>
+ public static class SingleDatacenterFilter implements SourceFilter
{
private final String sourceDc;
private final IEndpointSnitch snitch;
@@ -168,27 +182,41 @@ public class RangeStreamer
this.snitch = snitch;
}
+ @Override
public boolean apply(Replica replica)
{
return snitch.getDatacenter(replica).equals(sourceDc);
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it does not belong to " + sourceDc + " datacenter";
+ }
}
/**
* Source filter which excludes the current node from source calculations
*/
- public static class ExcludeLocalNodeFilter implements Predicate<Replica>
+ public static class ExcludeLocalNodeFilter implements SourceFilter
{
+ @Override
public boolean apply(Replica replica)
{
return !replica.isLocal();
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it is local";
+ }
}
/**
* Source filter which only includes endpoints contained within a provided set.
*/
- public static class WhitelistedSourcesFilter implements Predicate<Replica>
+ public static class WhitelistedSourcesFilter implements SourceFilter
{
private final Set<InetAddressAndPort> whitelistedSources;
@@ -201,6 +229,12 @@ public class RangeStreamer
{
return whitelistedSources.contains(replica.endpoint());
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it was not whitelisted, whitelisted sources: " + whitelistedSources;
+ }
}
public RangeStreamer(TokenMetadata metadata,
@@ -213,6 +247,21 @@ public class RangeStreamer
boolean connectSequentially,
int connectionsPerHost)
{
+ this(metadata, tokens, address, streamOperation, useStrictConsistency, snitch, stateStore,
+ FailureDetector.instance, connectSequentially, connectionsPerHost);
+ }
+
+ RangeStreamer(TokenMetadata metadata,
+ Collection<Token> tokens,
+ InetAddressAndPort address,
+ StreamOperation streamOperation,
+ boolean useStrictConsistency,
+ IEndpointSnitch snitch,
+ StreamStateStore stateStore,
+ IFailureDetector failureDetector,
+ boolean connectSequentially,
+ int connectionsPerHost)
+ {
Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
this.metadata = metadata;
this.tokens = tokens;
@@ -223,13 +272,34 @@ public class RangeStreamer
this.snitch = snitch;
this.stateStore = stateStore;
streamPlan.listeners(this.stateStore);
+
+ // We're _always_ filtering out a local node and down sources
+ addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector));
+ addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
}
- public void addSourceFilter(Predicate<Replica> filter)
+ public void addSourceFilter(SourceFilter filter)
{
sourceFilters.add(filter);
}
+ // Creates error message from source filters
+ private static String buildErrorMessage(Collection<SourceFilter> sourceFilters, ReplicaCollection<?> replicas)
+ {
+ StringBuilder failureMessage = new StringBuilder();
+ for (Replica r : replicas)
+ {
+ for (SourceFilter filter : sourceFilters)
+ {
+ if (!filter.apply(r))
+ {
+ failureMessage.append(filter.message(r));
+ break;
+ }
+ }
+ }
+ return failureMessage.toString();
+ }
/**
* Add ranges to be streamed for given keyspace.
*
@@ -252,7 +322,6 @@ public class RangeStreamer
for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries())
logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName);
-
Multimap<InetAddressAndPort, FetchReplica> workMap;
//Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no
//transient replicas.
@@ -265,10 +334,12 @@ public class RangeStreamer
workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
}
- toFetch.put(keyspaceName, workMap);
- for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
+ if (toFetch.put(keyspaceName, workMap) != null)
+ throw new IllegalArgumentException("Keyspace is already added to fetch map");
+
+ if (logger.isTraceEnabled())
{
- if (logger.isTraceEnabled())
+ for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
{
for (FetchReplica r : entry.getValue())
logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName);
@@ -289,10 +360,6 @@ public class RangeStreamer
/**
* Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters
- * @param fetchRanges
- * @param keyspace
- * @param useStrictConsistency
- * @return
*/
private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency)
{
@@ -305,7 +372,7 @@ public class RangeStreamer
if (tokens != null)
{
// Pending ranges
- tmdAfter = tmd.cloneOnlyTokenMap();
+ tmdAfter = tmd.cloneOnlyTokenMap();
tmdAfter.updateNormalTokens(tokens, address);
}
else if (useStrictConsistency)
@@ -313,15 +380,14 @@ public class RangeStreamer
throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens");
}
- return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
- strat,
- fetchRanges,
- useStrictConsistency,
- tmd,
- tmdAfter,
- ALIVE_PREDICATE,
- keyspace.getName(),
- sourceFilters);
+ return calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
+ strat,
+ fetchRanges,
+ useStrictConsistency,
+ tmd,
+ tmdAfter,
+ keyspace.getName(),
+ sourceFilters);
}
@@ -329,7 +395,6 @@ public class RangeStreamer
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
- *
**/
public static EndpointsByReplica
calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
@@ -338,165 +403,148 @@ public class RangeStreamer
boolean useStrictConsistency,
TokenMetadata tmdBefore,
TokenMetadata tmdAfter,
- Predicate<Replica> isAlive,
String keyspace,
- Collection<Predicate<Replica>> sourceFilters)
- {
- EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
-
- InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
- logger.debug ("Keyspace: {}", keyspace);
- logger.debug("To fetch RN: {}", fetchRanges);
- logger.debug("Fetch ranges: {}", rangeAddresses);
-
- Predicate<Replica> testSourceFilters = and(sourceFilters);
- Function<EndpointsForRange, EndpointsForRange> sorted =
- endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
- //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
- EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
- for (Replica toFetch : fetchRanges)
- {
- //Replica that is sufficient to provide the data we need
- //With strict consistency and transient replication we may end up with multiple types
- //so this isn't used with strict consistency
- Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
- Predicate<Replica> accept = r ->
- isSufficient.test(r) // is sufficient
- && !r.endpoint().equals(localAddress) // is not self
- && isAlive.test(r); // is alive
-
- logger.debug("To fetch {}", toFetch);
- for (Range<Token> range : rangeAddresses.keySet())
- {
- if (range.contains(toFetch.range()))
- {
- EndpointsForRange oldEndpoints = rangeAddresses.get(range);
-
- //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
- //It could be multiple endpoints and we must fetch from all of them if they are there
- //With transient replication and strict consistency this is to get the full data from a full replica and
- //transient data from the transient replica losing data
- EndpointsForRange sources;
- if (useStrictConsistency)
- {
- //Start with two sets of who replicates the range before and who replicates it after
- EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
- logger.debug("Old endpoints {}", oldEndpoints);
- logger.debug("New endpoints {}", newEndpoints);
-
- //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
- //So we need to be careful to only be strict when endpoints == RF
- if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
- {
- Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
- // Remove new endpoints from old endpoints based on address
- oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
-
- if (!all(oldEndpoints, isAlive))
- throw new IllegalStateException("A node required to move the data consistently is down: "
- + oldEndpoints.filter(not(isAlive)));
-
- if (oldEndpoints.size() > 1)
- throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
-
- //If we are transitioning from transient to full and and the set of replicas for the range is not changing
- //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
- //since we are already a transient replica and the existing replica remains.
- //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
- //So it's an error if we don't find what we need.
- if (oldEndpoints.isEmpty() && toFetch.isTransient())
- {
- throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
- }
-
- if (!any(oldEndpoints, isSufficient))
- {
- // need an additional replica
- EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
- // include all our filters, to ensure we include a matching node
- Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
- if (fullReplica.isPresent())
- oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
- else
- throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
- }
-
- //We have to check the source filters here to see if they will remove any replicas
- //required for strict consistency
- if (!all(oldEndpoints, testSourceFilters))
- throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
- }
- else
- {
- oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
- }
-
- //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
- sources = oldEndpoints.filter(testSourceFilters);
- }
- else
- {
- //Without strict consistency we have given up on correctness so no point in fetching from
- //a random full + transient replica since it's also likely to lose data
- //Also apply testSourceFilters that were given to us so we can safely select a single source
- sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
- //Limit it to just the first possible source, we don't need more than one and downstream
- //will fetch from every source we supply
- sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
- }
-
- // storing range and preferred endpoint set
- rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
- logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
- }
- }
-
- EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
- if (addressList == null)
- throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
-
- /*
- * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
- * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
- * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
- * For a transient range we only need to fetch from one.
- */
- if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
- throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
-
- //We must have enough stuff to fetch from
- if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
- {
- if (strat.getReplicationFactor().allReplicas == 1)
- {
- if (useStrictConsistency)
- {
- logger.warn("A node required to move the data consistently is down");
- throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
- "Ensure this keyspace contains replicas in the source datacenter.");
- }
- else
- logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
- "Keyspace might be missing data.", toFetch, keyspace);
-
- }
- else
- {
- if (useStrictConsistency)
- logger.warn("A node required to move the data consistently is down");
- throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
- }
- }
- }
- return rangesToFetchWithPreferredEndpoints.asImmutableView();
- }
+ Collection<SourceFilter> sourceFilters)
+ {
+ EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
+
+ InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ logger.debug ("Keyspace: {}", keyspace);
+ logger.debug("To fetch RN: {}", fetchRanges);
+ logger.debug("Fetch ranges: {}", rangeAddresses);
+
+ Predicate<Replica> testSourceFilters = and(sourceFilters);
+ Function<EndpointsForRange, EndpointsForRange> sorted =
+ endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
+
+ //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
+ EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
+ for (Replica toFetch : fetchRanges)
+ {
+ //Replica that is sufficient to provide the data we need
+ //With strict consistency and transient replication we may end up with multiple types
+ //so this isn't used with strict consistency
+ Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull();
+
+ logger.debug("To fetch {}", toFetch);
+ for (Range<Token> range : rangeAddresses.keySet())
+ {
+ if (!range.contains(toFetch.range()))
+ continue;
+
+ final EndpointsForRange oldEndpoints = sorted.apply(rangeAddresses.get(range));
+
+ //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
+ //It could be multiple endpoints and we must fetch from all of them if they are there
+ //With transient replication and strict consistency this is to get the full data from a full replica and
+ //transient data from the transient replica losing data
+ EndpointsForRange sources;
+ if (useStrictConsistency)
+ {
+ EndpointsForRange strictEndpoints;
+ //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //So we need to be careful to only be strict when endpoints == RF
+ if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
+ {
+ //Start with two sets of who replicates the range before and who replicates it after
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+ logger.debug("Old endpoints {}", oldEndpoints);
+ logger.debug("New endpoints {}", newEndpoints);
+
+ // Remove new endpoints from old endpoints based on address
+ strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
+
+ if (strictEndpoints.size() > 1)
+ throw new AssertionError("Expected <= 1 endpoint but found " + strictEndpoints);
+
+ //We have to check the source filters here to see if they will remove any replicas
+ //required for strict consistency
+ if (!all(strictEndpoints, testSourceFilters))
+ throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
+
+ //If we are transitioning from transient to full and and the set of replicas for the range is not changing
+ //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
+ //since we are already a transient replica and the existing replica remains.
+ //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
+ //So it's an error if we don't find what we need.
+ if (strictEndpoints.isEmpty() && toFetch.isTransient())
+ throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
+
+ if (!any(strictEndpoints, isSufficient))
+ {
+ // need an additional replica; include all our filters, to ensure we include a matching node
+ Optional<Replica> fullReplica = Iterables.<Replica>tryFind(oldEndpoints, and(isSufficient, testSourceFilters)).toJavaUtil();
+ if (fullReplica.isPresent())
+ strictEndpoints = Endpoints.concat(strictEndpoints, EndpointsForRange.of(fullReplica.get()));
+ else
+ throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + buildErrorMessage(sourceFilters, oldEndpoints));
+ }
+ }
+ else
+ {
+ strictEndpoints = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
+ }
+
+ sources = strictEndpoints;
+ }
+ else
+ {
+ //Without strict consistency we have given up on correctness so no point in fetching from
+ //a random full + transient replica since it's also likely to lose data
+ //Also apply testSourceFilters that were given to us so we can safely select a single source
+ sources = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
+ //Limit it to just the first possible source, we don't need more than one and downstream
+ //will fetch from every source we supply
+ sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
+ }
+
+ // storing range and preferred endpoint set
+ rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
+ logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
+ }
+
+ EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
+ if (addressList == null)
+ throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
+
+ /*
+ * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
+ * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
+ * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
+ * For a transient range we only need to fetch from one.
+ */
+ if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
+ throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
+
+ //We must have enough stuff to fetch from
+ if (!any(addressList, isSufficient))
+ {
+ if (strat.getReplicationFactor().allReplicas == 1)
+ {
+ if (useStrictConsistency)
+ {
+ logger.warn("A node required to move the data consistently is down");
+ throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
+ "Ensure this keyspace contains replicas in the source datacenter.");
+ }
+ else
+ logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
+ "Keyspace might be missing data.", toFetch, keyspace);
+ }
+ else
+ {
+ if (useStrictConsistency)
+ logger.warn("A node required to move the data consistently is down");
+ throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
+ }
+ }
+ }
+ return rangesToFetchWithPreferredEndpoints.asImmutableView();
+ }
/**
* The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source
* endpoint we will fetch from which streaming wants.
- * @param preferredEndpoints
- * @return
*/
public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints)
{
@@ -505,7 +553,7 @@ public class RangeStreamer
{
for (Replica source : e.getValue())
{
- assert (e.getKey()).isLocal();
+ assert e.getKey().isLocal();
assert !source.isLocal();
workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
}
@@ -518,7 +566,8 @@ public class RangeStreamer
* Optimized version that also outputs the final work map
*/
private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources,
- Collection<Predicate<Replica>> sourceFilters, String keyspace)
+ Collection<SourceFilter> sourceFilters,
+ String keyspace)
{
//For now we just aren't going to use the optimized range fetch map with transient replication to shrink
//the surface area to test and introduce bugs.
@@ -531,10 +580,11 @@ public class RangeStreamer
unwrapped.put(entry.getKey().range(), entry.getValue());
}
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace);
+ EndpointsByRange unwrappedView = unwrapped.asImmutableView();
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrappedView, sourceFilters, keyspace);
Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace);
- validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, keyspace);
+ validateRangeFetchMap(unwrappedView, rangeFetchMapMap, keyspace);
//Need to rewrap as Replicas
Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create();
@@ -562,9 +612,6 @@ public class RangeStreamer
/**
* Verify that source returned for each range is correct
- * @param rangesWithSources
- * @param rangeFetchMapMap
- * @param keyspace
*/
private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace)
{
@@ -588,7 +635,7 @@ public class RangeStreamer
// For testing purposes
@VisibleForTesting
- Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
+ Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
{
return toFetch;
}
@@ -600,16 +647,19 @@ public class RangeStreamer
sources.asMap().forEach((source, fetchReplicas) -> {
// filter out already streamed ranges
- RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
+ SystemKeyspace.AvailableRanges available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
Predicate<FetchReplica> isAvailable = fetch -> {
- Replica availableRange = available.byRange().get(fetch.local.range());
- if (availableRange == null)
+ boolean isInFull = available.full.contains(fetch.local.range());
+ boolean isInTrans = available.trans.contains(fetch.local.range());
+
+ if (!isInFull && !isInTrans)
//Range is unavailable
return false;
+
if (fetch.local.isFull())
//For full, pick only replicas with matching transientness
- return availableRange.isFull() == fetch.remote.isFull();
+ return isInFull == fetch.remote.isFull();
// Any transient or full will do
return true;
@@ -617,22 +667,16 @@ public class RangeStreamer
List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
- if (remaining.size() < available.size())
+ if (remaining.size() < available.full.size() + available.trans.size())
{
List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
- fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges());
+ fetchReplicas, skipped, available.full, available.trans);
}
if (logger.isTraceEnabled())
logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
- //At the other end the distinction between full and transient is ignored it just used the transient status
- //of the Replica objects we send to determine what to send. The real reason we have this split down to
- //StreamRequest is that on completion StreamRequest is used to write to the system table tracking
- //what has already been streamed. At that point since we only have the local Replica instances so we don't
- //know what we got from the remote. We preserve that here by splitting based on the remotes transient
- //status.
InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
RangesAtEndpoint full = remaining.stream()
.filter(pair -> pair.remote.isFull())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index 3144e81..e62bc04 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.dht;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.locator.RangesAtEndpoint;
+import com.google.common.collect.Streams;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ public class StreamStateStore implements StreamEventHandler
{
private static final Logger logger = LoggerFactory.getLogger(StreamStateStore.class);
- public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
+ public SystemKeyspace.AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
{
return SystemKeyspace.getAvailableRanges(keyspace, partitioner);
}
@@ -54,8 +55,11 @@ public class StreamStateStore implements StreamEventHandler
@VisibleForTesting
public boolean isDataAvailable(String keyspace, Token token)
{
- RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
- return availableRanges.ranges().stream().anyMatch(range -> range.contains(token));
+ SystemKeyspace.AvailableRanges availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
+
+ return Streams.concat(availableRanges.full.stream(),
+ availableRanges.trans.stream())
+ .anyMatch(range -> range.contains(token));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 1773173..f57c28e 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.stream.Collector;
import java.util.stream.Collectors;
+import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*;
/**
@@ -302,6 +303,11 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
.collect(collector(dummy));
}
+ public static boolean isDummyList(RangesAtEndpoint ranges)
+ {
+ return all(ranges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0"));
+ }
+
/**
* @return concatenate two DISJOINT collections together
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/RangeRelocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java
new file mode 100644
index 0000000..f2af3db
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/RangeRelocator.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+@VisibleForTesting
+public class RangeRelocator
+{
+ private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
+
+ private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
+ private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ private final TokenMetadata tokenMetaCloneAllSettled;
+ // clone to avoid concurrent modification in calculateNaturalReplicas
+ private final TokenMetadata tokenMetaClone;
+ private final Collection<Token> tokens;
+ private final List<String> keyspaceNames;
+
+
+ RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
+ {
+ this.tokens = tokens;
+ this.keyspaceNames = keyspaceNames;
+ this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
+ // clone to avoid concurrent modification in calculateNaturalReplicas
+ this.tokenMetaClone = tmd.cloneOnlyTokenMap();
+ }
+
+ @VisibleForTesting
+ public RangeRelocator()
+ {
+ this.tokens = null;
+ this.keyspaceNames = null;
+ this.tokenMetaCloneAllSettled = null;
+ this.tokenMetaClone = null;
+ }
+
+ /**
+ * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
+ */
+ private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges,
+ AbstractReplicationStrategy strategy,
+ String keyspace,
+ TokenMetadata tmdBefore,
+ TokenMetadata tmdAfter)
+ {
+ EndpointsByReplica preferredEndpoints =
+ RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity,
+ strategy,
+ fetchRanges,
+ StorageService.useStrictConsistency,
+ tmdBefore,
+ tmdAfter,
+ keyspace,
+ Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance),
+ new RangeStreamer.ExcludeLocalNodeFilter()));
+ return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
+ }
+
+ /**
+ * calculating endpoints to stream current ranges to if needed
+ * in some situations node will handle current ranges as part of the new ranges
+ **/
+ public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
+ AbstractReplicationStrategy strat,
+ TokenMetadata tmdBefore,
+ TokenMetadata tmdAfter)
+ {
+ RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
+ for (Replica toStream : streamRanges)
+ {
+ //If the range we are sending is full only send it to the new full replica
+ //There will also be a new transient replica we need to send the data to, but not
+ //the repaired data
+ EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
+ logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, oldEndpoints, newEndpoints);
+
+ for (Replica newEndpoint : newEndpoints)
+ {
+ Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint());
+
+ // Nothing to do
+ if (newEndpoint.equals(oldEndpoint))
+ continue;
+
+ // Completely new range for this endpoint
+ if (oldEndpoint == null)
+ {
+ if (toStream.isTransient() && newEndpoint.isFull())
+ throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", newEndpoint, toStream));
+
+ for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range()))
+ {
+ endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection));
+ }
+ }
+ else
+ {
+ Set<Range<Token>> subsToStream = Collections.singleton(toStream.range());
+
+ //First subtract what we already have
+ if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull())
+ subsToStream = toStream.range().subtract(oldEndpoint.range());
+
+ //Now we only stream what is still replicated
+ subsToStream.stream()
+ .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream())
+ .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(tokenRange)));
+ }
+ }
+ }
+ return endpointRanges.asImmutableView();
+ }
+
+ public void calculateToFromStreams()
+ {
+ logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled);
+
+ for (String keyspace : keyspaceNames)
+ {
+ // replication strategy of the current keyspace
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+ logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
+ //From what I have seen we only ever call this with a single token from StorageService.move(Token)
+ for (Token newToken : tokens)
+ {
+ Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
+ if (currentTokens.size() > 1 || currentTokens.isEmpty())
+ {
+ throw new AssertionError("Unexpected current tokens: " + currentTokens);
+ }
+
+ // calculated parts of the ranges to request/stream from/to nodes in the ring
+ Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges;
+
+ //In the single node token move there is nothing to do and Range subtraction is broken
+ //so it's easier to just identify this case up front.
+ if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
+)).size() > 1)
+ {
+ // getting collection of the currently used ranges by this keyspace
+ RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
+
+ // collection of ranges which this node will serve after move to the new token
+ RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
+
+ streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
+ }
+ else
+ {
+ streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
+ }
+
+ RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
+ logger.info("Endpoint ranges to stream to " + rangesToStream);
+
+ // stream ranges
+ for (InetAddressAndPort address : rangesToStream.keySet())
+ {
+ logger.debug("Will stream range {} of keyspace {} to endpoint {}", rangesToStream.get(address), keyspace, address);
+ RangesAtEndpoint ranges = rangesToStream.get(address);
+ streamPlan.transferRanges(address, keyspace, ranges);
+ }
+
+ Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch = calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace, tokenMetaClone, tokenMetaCloneAllSettled);
+
+ // stream requests
+ rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> {
+ RangesAtEndpoint full = sourceAndOurReplicas.stream()
+ .filter(pair -> pair.remote.isFull())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(localAddress));
+ RangesAtEndpoint trans = sourceAndOurReplicas.stream()
+ .filter(pair -> pair.remote.isTransient())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(localAddress));
+ logger.debug("Will request range {} of keyspace {} from endpoint {}", rangesToFetch.get(address), keyspace, address);
+ streamPlan.requestRanges(address, keyspace, full, trans);
+ });
+
+ logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch);
+ }
+ }
+ }
+
+ /**
+ * Calculate pair of ranges to stream/fetch for given two range collections
+ * (current ranges for keyspace and ranges after move to new token)
+ *
+ * With transient replication the added wrinkle is that if a range transitions from full to transient then
+ * we need to stream the range despite the fact that we are retaining it as transient. Some replica
+ * somewhere needs to transition from transient to full and we will be the source.
+ *
+ * If the range is transient and is transitioning to full then always fetch even if the range was already transient
+ * since a transiently replicated obviously needs to fetch data to become full.
+ *
+ * This why there is a continue after checking for instersection because intersection is not sufficient reason
+ * to do the subtraction since we might need to stream/fetch data anyways.
+ *
+ * @param currentRanges collection of the ranges by current token
+ * @param updatedRanges collection of the ranges after token is changed
+ * @return pair of ranges to stream/fetch for given current and updated range collections
+ */
+ public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint currentRanges, RangesAtEndpoint updatedRanges)
+ {
+ RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(currentRanges.endpoint());
+ RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(currentRanges.endpoint());
+ logger.debug("Calculating toStream");
+ computeRanges(currentRanges, updatedRanges, toStream);
+
+ logger.debug("Calculating toFetch");
+ computeRanges(updatedRanges, currentRanges, toFetch);
+
+ logger.debug("To stream {}", toStream);
+ logger.debug("To fetch {}", toFetch);
+ return Pair.create(toStream.build(), toFetch.build());
+ }
+
+ private static void computeRanges(RangesAtEndpoint srcRanges, RangesAtEndpoint dstRanges, RangesAtEndpoint.Builder ranges)
+ {
+ for (Replica src : srcRanges)
+ {
+ boolean intersect = false;
+ RangesAtEndpoint remainder = null;
+ for (Replica dst : dstRanges)
+ {
+ logger.debug("Comparing {} and {}", src, dst);
+ // Stream the full range if there's no intersection
+ if (!src.intersectsOnRange(dst))
+ continue;
+
+ // If we're transitioning from full to transient
+ if (src.isFull() && dst.isTransient())
+ continue;
+
+ if (remainder == null)
+ {
+ remainder = src.subtractIgnoreTransientStatus(dst.range());
+ }
+ else
+ {
+ // Re-subtract ranges to avoid overstreaming in cases when the single range is split or merged
+ RangesAtEndpoint.Builder newRemainder = new RangesAtEndpoint.Builder(remainder.endpoint());
+ for (Replica replica : remainder)
+ newRemainder.addAll(replica.subtractIgnoreTransientStatus(dst.range()));
+ remainder = newRemainder.build();
+ }
+ intersect = true;
+ }
+
+ if (!intersect)
+ {
+ assert remainder == null;
+ logger.debug(" Doesn't intersect adding {}", src);
+ ranges.add(src); // should stream whole old range
+ }
+ else
+ {
+ ranges.addAll(remainder);
+ logger.debug(" Intersects adding {}", remainder);
+ }
+ }
+ }
+
+ public Future<StreamState> stream()
+ {
+ return streamPlan.execute();
+ }
+
+ public boolean streamsNeeded()
+ {
+ return !streamPlan.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a979f1c..391598c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -236,7 +236,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private Collection<Token> bootstrapTokens = null;
// true when keeping strict consistency while bootstrapping
- private static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
+ public static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
private static final boolean allowSimultaneousMoves = Boolean.parseBoolean(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
private static final boolean joinRing = Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"));
private boolean replacing;
@@ -1227,7 +1227,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
streamStateStore,
false,
DatabaseDescriptor.getStreamingConnectionsPerHost());
- streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
@@ -4316,208 +4315,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
}
- @VisibleForTesting
- public static class RangeRelocator
- {
- private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
- private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
- private final TokenMetadata tokenMetaCloneAllSettled;
- // clone to avoid concurrent modification in calculateNaturalReplicas
- private final TokenMetadata tokenMetaClone;
- private final Collection<Token> tokens;
- private final List<String> keyspaceNames;
-
-
- private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
- {
- this.tokens = tokens;
- this.keyspaceNames = keyspaceNames;
- this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
- // clone to avoid concurrent modification in calculateNaturalReplicas
- this.tokenMetaClone = tmd.cloneOnlyTokenMap();
- }
-
- @VisibleForTesting
- public RangeRelocator()
- {
- this.tokens = null;
- this.keyspaceNames = null;
- this.tokenMetaCloneAllSettled = null;
- this.tokenMetaClone = null;
- }
-
- /**
- * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
- */
- private Multimap<InetAddressAndPort, FetchReplica> calculateRangesToFetchWithPreferredEndpoints(AbstractReplicationStrategy strategy, RangesAtEndpoint fetchRanges, String keyspace)
- {
- EndpointsByReplica preferredEndpoints =
- RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> DatabaseDescriptor.getEndpointSnitch().sortedByProximity(address, replicas),
- strategy,
- fetchRanges,
- useStrictConsistency,
- tokenMetaClone,
- tokenMetaCloneAllSettled,
- RangeStreamer.ALIVE_PREDICATE,
- keyspace,
- Collections.emptyList());
- return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
- }
-
- /**
- * calculating endpoints to stream current ranges to if needed
- * in some situations node will handle current ranges as part of the new ranges
- **/
- public RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
- AbstractReplicationStrategy strat,
- TokenMetadata tmdBefore,
- TokenMetadata tmdAfter)
- {
- RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
- for (Replica toStream : streamRanges)
- {
- //If the range we are sending is full only send it to the new full replica
- //There will also be a new transient replica we need to send the data to, but not
- //the repaired data
- EndpointsForRange currentEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
- EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
- logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, currentEndpoints, newEndpoints);
-
- for (Replica current : currentEndpoints)
- {
- for (Replica updated : newEndpoints)
- {
- if (current.endpoint().equals(updated.endpoint()))
- {
- //Nothing to do
- if (current.equals(updated))
- break;
-
- //In these two (really three) cases the existing data is sufficient and we should subtract whatever is already replicated
- if (current.isFull() == updated.isFull() || current.isFull())
- {
- //First subtract what we already have
- Set<Range<Token>> subsToStream = toStream.range().subtract(current.range());
- //Now we only stream what is still replicated
- subsToStream = subsToStream.stream().flatMap(range -> range.intersectionWith(updated.range()).stream()).collect(Collectors.toSet());
- for (Range<Token> subrange : subsToStream)
- {
- //Only stream what intersects with what is in the new world
- Set<Range<Token>> intersections = subrange.intersectionWith(updated.range());
- for (Range<Token> intersection : intersections)
- {
- endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
- }
- }
- }
- else
- {
- for (Range<Token> intersection : toStream.range().intersectionWith(updated.range()))
- {
- endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
- }
- }
- }
- }
- }
-
- for (Replica updated : newEndpoints)
- {
- if (!currentEndpoints.byEndpoint().containsKey(updated.endpoint()))
- {
- // Completely new range for this endpoint
- if (toStream.isTransient() && updated.isFull())
- {
- throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", updated, toStream));
- }
- for (Range<Token> intersection : updated.range().intersectionWith(toStream.range()))
- {
- endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
- }
- }
- }
- }
- return endpointRanges.asImmutableView();
- }
-
- private void calculateToFromStreams()
- {
- logger.debug("Current tmd " + tokenMetaClone);
- logger.debug("Updated tmd " + tokenMetaCloneAllSettled);
- for (String keyspace : keyspaceNames)
- {
- // replication strategy of the current keyspace
- AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
- // getting collection of the currently used ranges by this keyspace
- RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
-
- logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
- //From what I have seen we only ever call this with a single token from StorageService.move(Token)
- for (Token newToken : tokens)
- {
- Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
- if (currentTokens.size() > 1 || currentTokens.isEmpty())
- {
- throw new AssertionError("Unexpected current tokens: " + currentTokens);
- }
-
- // collection of ranges which this node will serve after move to the new token
- RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
-
- // calculated parts of the ranges to request/stream from/to nodes in the ring
- Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
- //In the single node token move there is nothing to do and Range subtraction is broken
- //so it's easier to just identify this case up front.
- if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
-)).size() > 1)
- {
- streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
- }
-
- Multimap<InetAddressAndPort, FetchReplica> workMap = calculateRangesToFetchWithPreferredEndpoints(strategy, streamAndFetchOwnRanges.right, keyspace);
-
- RangesByEndpoint endpointRanges = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
-
- logger.info("Endpoint ranges to stream to " + endpointRanges);
-
- // stream ranges
- for (InetAddressAndPort address : endpointRanges.keySet())
- {
- logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address);
- RangesAtEndpoint ranges = endpointRanges.get(address);
- streamPlan.transferRanges(address, keyspace, ranges);
- }
-
- // stream requests
- workMap.asMap().forEach((address, sourceAndOurReplicas) -> {
- RangesAtEndpoint full = sourceAndOurReplicas.stream()
- .filter(pair -> pair.remote.isFull())
- .map(pair -> pair.local)
- .collect(RangesAtEndpoint.collector(localAddress));
- RangesAtEndpoint transientReplicas = sourceAndOurReplicas.stream()
- .filter(pair -> pair.remote.isTransient())
- .map(pair -> pair.local)
- .collect(RangesAtEndpoint.collector(localAddress));
- logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address);
- streamPlan.requestRanges(address, keyspace, full, transientReplicas);
- });
-
- logger.debug("Keyspace {}: work map {}.", keyspace, workMap);
- }
- }
- }
-
- public Future<StreamState> stream()
- {
- return streamPlan.execute();
- }
-
- public boolean streamsNeeded()
- {
- return !streamPlan.isEmpty();
- }
- }
-
public String getRemovalStatus()
{
return getRemovalStatus(false);
@@ -5271,115 +5068,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return streamPlan.execute();
}
- /**
- * Calculate pair of ranges to stream/fetch for given two range collections
- * (current ranges for keyspace and ranges after move to new token)
- *
- * With transient replication the added wrinkle is that if a range transitions from full to transient then
- * we need to stream the range despite the fact that we are retaining it as transient. Some replica
- * somewhere needs to transition from transient to full and we wll be the source.
- *
- * If the range is transient and is transitioning to full then always fetch even if the range was already transient
- * since a transiently replicated obviously needs to fetch data to become full.
- *
- * This why there is a continue after checking for instersection because intersection is not sufficient reason
- * to do the subtraction since we might need to stream/fetch data anyways.
- *
- * @param current collection of the ranges by current token
- * @param updated collection of the ranges after token is changed
- * @return pair of ranges to stream/fetch for given current and updated range collections
- */
- public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint current, RangesAtEndpoint updated)
- {
- // FIXME: transient replication
- // this should always be the local node, except for tests TODO: assert this
- RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(current.endpoint());
- RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(current.endpoint());
-
- logger.debug("Calculating toStream");
- for (Replica r1 : current)
- {
- boolean intersect = false;
- RangesAtEndpoint.Mutable remainder = null;
- for (Replica r2 : updated)
- {
- logger.debug("Comparing {} and {}", r1, r2);
- //If we will end up transiently replicating send the entire thing and don't subtract
- if (r1.intersectsOnRange(r2) && !(r1.isFull() && r2.isTransient()))
- {
- RangesAtEndpoint.Mutable oldRemainder = remainder;
- remainder = new RangesAtEndpoint.Mutable(current.endpoint());
- if (oldRemainder != null)
- {
- for (Replica replica : oldRemainder)
- {
- remainder.addAll(replica.subtractIgnoreTransientStatus(r2.range()));
- }
- }
- else
- {
- remainder.addAll(r1.subtractIgnoreTransientStatus(r2.range()));
- }
- logger.debug(" Intersects adding {}", remainder);
- intersect = true;
- }
- }
- if (!intersect)
- {
- logger.debug(" Doesn't intersect adding {}", r1);
- toStream.add(r1); // should stream whole old range
- }
- else
- {
- toStream.addAll(remainder);
- }
- }
-
- logger.debug("Calculating toFetch");
- for (Replica r2 : updated)
- {
- boolean intersect = false;
- RangesAtEndpoint.Mutable remainder = null;
- for (Replica r1 : current)
- {
- logger.info("Comparing {} and {}", r2, r1);
- //Transitioning from transient to full means fetch everything so intersection doesn't matter.
- if (r2.intersectsOnRange(r1) && !(r1.isTransient() && r2.isFull()))
- {
- RangesAtEndpoint.Mutable oldRemainder = remainder;
- remainder = new RangesAtEndpoint.Mutable(current.endpoint());
- if (oldRemainder != null)
- {
- for (Replica replica : oldRemainder)
- {
- remainder.addAll(replica.subtractIgnoreTransientStatus(r1.range()));
- }
- }
- else
- {
- remainder.addAll(r2.subtractIgnoreTransientStatus(r1.range()));
- }
- logger.debug(" Intersects adding {}", remainder);
- intersect = true;
- }
- }
- if (!intersect)
- {
- logger.debug(" Doesn't intersect adding {}", r2);
- toFetch.add(r2); // should fetch whole old range
- }
- else
- {
- toFetch.addAll(remainder);
- }
- }
-
- logger.debug("To stream {}", toStream);
- logger.debug("To fetch {}", toFetch);
-
- return Pair.create(toStream.build(), toFetch.build());
- }
-
public void bulkLoad(String directory)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 2f6deb5..ea54f9d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -70,6 +70,16 @@ public class StreamPlan
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
+ * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
+ * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
+ *
+ * At the other end the distinction between full and transient is ignored it just used the transient status
+ * of the Replica objects we send to determine what to send. The real reason we have this split down to
+ * StreamRequest is that on completion StreamRequest is used to write to the system table tracking
+ * what has already been streamed. At that point since we only have the local Replica instances so we don't
+ * know what we got from the remote. We preserve that here by splitting based on the remotes transient
+ * status.
+ *
* @param from endpoint address to fetch data from.
* @param keyspace name of keyspace
* @param fullRanges ranges to fetch that from provides the full version of
@@ -94,10 +104,9 @@ public class StreamPlan
public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
- assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
- fullRanges.toString();
- assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
- transientRanges.toString();
+ assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+ assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+
StreamSession session = coordinator.getOrCreateNextSession(from);
session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
return this;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index ec80772..d7d0836 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -300,6 +300,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
/**
* Request data fetch task to this session.
*
+ * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
+ * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
+ *
* @param keyspace Requesting keyspace
* @param fullRanges Ranges to retrieve data that will return full data from the source
* @param transientRanges Ranges to retrieve data that will return transient data from the source
@@ -308,8 +311,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
- assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString();
- assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString();
+ assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+ assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+
requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org