You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/27 16:04:46 UTC
cassandra git commit: Consolidate batch write code
Repository: cassandra
Updated Branches:
refs/heads/trunk 914c66685 -> 29f83b888
Consolidate batch write code
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14742
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29f83b88
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29f83b88
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29f83b88
Branch: refs/heads/trunk
Commit: 29f83b88821c4792087df19d829ac87b5c06e9e6
Parents: 914c666
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Sep 17 15:13:05 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Sep 27 18:03:48 2018 +0200
----------------------------------------------------------------------
.../cassandra/batchlog/BatchlogManager.java | 89 -------------
.../cassandra/config/DatabaseDescriptor.java | 2 +-
.../db/CounterMutationVerbHandler.java | 2 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 4 +-
.../org/apache/cassandra/db/view/ViewUtils.java | 6 +-
.../org/apache/cassandra/dht/Datacenters.java | 2 +-
.../cassandra/dht/RangeFetchMapCalculator.java | 2 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 8 +-
.../cassandra/locator/EndpointSnitchInfo.java | 4 +-
.../cassandra/locator/IEndpointSnitch.java | 22 ++-
.../apache/cassandra/locator/InOurDcTester.java | 2 +-
.../org/apache/cassandra/locator/Replica.java | 2 +-
.../apache/cassandra/locator/ReplicaPlans.java | 133 ++++++++++++++++++-
.../cassandra/locator/SystemReplicas.java | 5 +-
.../apache/cassandra/net/MessagingService.java | 6 +-
.../cassandra/service/RangeRelocator.java | 3 +-
.../apache/cassandra/service/StartupChecks.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 85 +++++-------
.../cassandra/service/StorageService.java | 8 +-
.../service/reads/AbstractReadExecutor.java | 2 +-
.../reads/ShortReadPartitionsProtection.java | 2 +-
.../apache/cassandra/streaming/StreamPlan.java | 4 +-
.../cassandra/streaming/StreamSession.java | 4 +-
.../batchlog/BatchlogEndpointFilterTest.java | 65 ++++-----
24 files changed, 240 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 77f725c..91129ed 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -527,93 +527,4 @@ public class BatchlogManager implements BatchlogManagerMBean
}
}
}
-
- public static class EndpointFilter
- {
- private final String localRack;
- private final Multimap<String, InetAddressAndPort> endpoints;
-
- public EndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints)
- {
- this.localRack = localRack;
- this.endpoints = endpoints;
- }
-
- /**
- * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
- */
- public Collection<InetAddressAndPort> filter()
- {
- // special case for single-node data centers
- if (endpoints.values().size() == 1)
- return endpoints.values();
-
- // strip out dead endpoints and localhost
- ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
- for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
- if (isValid(entry.getValue()))
- validated.put(entry.getKey(), entry.getValue());
-
- if (validated.size() <= 2)
- return validated.values();
-
- if (validated.size() - validated.get(localRack).size() >= 2)
- {
- // we have enough endpoints in other racks
- validated.removeAll(localRack);
- }
-
- if (validated.keySet().size() == 1)
- {
- /*
- * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
- * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
- * because of the preceding if block.
- */
- List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
- shuffle(otherRack);
- return otherRack.subList(0, 2);
- }
-
- // randomize which racks we pick from if more than 2 remaining
- Collection<String> racks;
- if (validated.keySet().size() == 2)
- {
- racks = validated.keySet();
- }
- else
- {
- racks = Lists.newArrayList(validated.keySet());
- shuffle((List<String>) racks);
- }
-
- // grab a random member of up to two racks
- List<InetAddressAndPort> result = new ArrayList<>(2);
- for (String rack : Iterables.limit(racks, 2))
- {
- List<InetAddressAndPort> rackMembers = validated.get(rack);
- result.add(rackMembers.get(getRandomInt(rackMembers.size())));
- }
-
- return result;
- }
-
- @VisibleForTesting
- protected boolean isValid(InetAddressAndPort input)
- {
- return !input.equals(FBUtilities.getBroadcastAddressAndPort()) && FailureDetector.instance.isAlive(input);
- }
-
- @VisibleForTesting
- protected int getRandomInt(int bound)
- {
- return ThreadLocalRandom.current().nextInt(bound);
- }
-
- @VisibleForTesting
- protected void shuffle(List<?> list)
- {
- Collections.shuffle(list);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2ad9b18..dc76431 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -991,7 +991,7 @@ public class DatabaseDescriptor
snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
EndpointSnitchInfo.create();
- localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ localDC = snitch.getLocalDatacenter();
localComparator = (replica1, replica2) -> {
boolean local1 = localDC.equals(snitch.getDatacenter(replica1));
boolean local2 = localDC.equals(snitch.getDatacenter(replica2));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 95d7916..c946ea5 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -37,7 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
final CounterMutation cm = message.payload;
logger.trace("Applying forwarded {}", cm);
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
// running this VerbHandler (see #4578).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0f904ce..1b3b2a6 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -460,8 +460,8 @@ public final class SystemKeyspace
FBUtilities.getReleaseVersionString(),
QueryProcessor.CQL_VERSION.toString(),
String.valueOf(ProtocolVersion.CURRENT.asInt()),
- snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()),
- snitch.getRack(FBUtilities.getBroadcastAddressAndPort()),
+ snitch.getLocalDatacenter(),
+ snitch.getLocalRack(),
DatabaseDescriptor.getPartitioner().getClass().getName(),
DatabaseDescriptor.getRpcAddress(),
DatabaseDescriptor.getNativeTransportPort(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index ad10d9d..e824732 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -63,11 +63,11 @@ public final class ViewUtils
{
AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken);
EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken);
- Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil();
+ Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isSelf).toJavaUtil();
if (localReplica.isPresent())
return localReplica;
@@ -93,7 +93,7 @@ public final class ViewUtils
int baseIdx = -1;
for (int i=0; i<baseReplicas.size(); i++)
{
- if (baseReplicas.get(i).isLocal())
+ if (baseReplicas.get(i).isSelf())
{
baseIdx = i;
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/Datacenters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Datacenters.java b/src/java/org/apache/cassandra/dht/Datacenters.java
index 26ae2e6..9695a09 100644
--- a/src/java/org/apache/cassandra/dht/Datacenters.java
+++ b/src/java/org/apache/cassandra/dht/Datacenters.java
@@ -32,7 +32,7 @@ public class Datacenters
private static class DCHandle
{
- private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
}
public static String thisDatacenter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 63265b7..2a2de01 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -357,7 +357,7 @@ public class RangeFetchMapCalculator
{
sourceFound = true;
// if we pass filters, it means that we don't filter away localhost and we can count it as a source:
- if (replica.isLocal())
+ if (replica.isSelf())
continue; // but don't add localhost to the graph to avoid streaming locally
final Vertex endpointVertex = new EndpointVertex(replica.endpoint());
capacityGraph.insertVertex(rangeVertex);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index f46d665..b50a4e2 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -104,7 +104,7 @@ public class RangeStreamer
{
Preconditions.checkNotNull(local);
Preconditions.checkNotNull(remote);
- assert local.isLocal() && !remote.isLocal();
+ assert local.isSelf() && !remote.isSelf();
this.local = local;
this.remote = remote;
}
@@ -203,7 +203,7 @@ public class RangeStreamer
@Override
public boolean apply(Replica replica)
{
- return !replica.isLocal();
+ return !replica.isSelf();
}
@Override
@@ -553,8 +553,8 @@ public class RangeStreamer
{
for (Replica source : e.getValue())
{
- assert e.getKey().isLocal();
- assert !source.isLocal();
+ assert (e.getKey()).isSelf();
+ assert !source.isSelf();
workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index c06d765..da90a79 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -53,12 +53,12 @@ public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
public String getDatacenter()
{
- return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ return DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
}
public String getRack()
{
- return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+ return DatabaseDescriptor.getEndpointSnitch().getLocalRack();
}
public String getSnitchName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index b7797b0..381a642 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.locator;
import java.util.Set;
+import org.apache.cassandra.utils.FBUtilities;
+
/**
* This interface helps determine location of node in the datacenter relative to another node.
* Give a node A and another node B it can tell if A and B are on the same rack or in the same
@@ -28,15 +30,31 @@ import java.util.Set;
public interface IEndpointSnitch
{
/**
- * returns a String representing the rack this endpoint belongs to
+ * returns a String representing the rack the given endpoint belongs to
*/
public String getRack(InetAddressAndPort endpoint);
/**
- * returns a String representing the datacenter this endpoint belongs to
+ * returns a String representing the rack current endpoint belongs to
+ */
+ default public String getLocalRack()
+ {
+ return getRack(FBUtilities.getBroadcastAddressAndPort());
+ }
+
+ /**
+ * returns a String representing the datacenter the given endpoint belongs to
*/
public String getDatacenter(InetAddressAndPort endpoint);
+ /**
+ * returns a String representing the datacenter current endpoint belongs to
+ */
+ default public String getLocalDatacenter()
+ {
+ return getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ }
+
default public String getDatacenter(Replica replica)
{
return getDatacenter(replica.endpoint());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/InOurDcTester.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InOurDcTester.java b/src/java/org/apache/cassandra/locator/InOurDcTester.java
index 23a8c13..514c7ef 100644
--- a/src/java/org/apache/cassandra/locator/InOurDcTester.java
+++ b/src/java/org/apache/cassandra/locator/InOurDcTester.java
@@ -43,7 +43,7 @@ public class InOurDcTester
// this final clause checks if somehow the snitch/localDc have got out of whack;
// presently, this is possible but very unlikely, but this check will also help
// resolve races on these global fields as well
- || !dc.equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()));
+ || !dc.equals(snitch.getLocalDatacenter());
}
private static final class ReplicaTester extends InOurDcTester implements Predicate<Replica>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/Replica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java
index 37b6050..c884f13 100644
--- a/src/java/org/apache/cassandra/locator/Replica.java
+++ b/src/java/org/apache/cassandra/locator/Replica.java
@@ -100,7 +100,7 @@ public final class Replica implements Comparable<Replica>
return endpoint;
}
- public boolean isLocal()
+ public boolean isSelf()
{
return endpoint.equals(FBUtilities.getBroadcastAddressAndPort());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 3d56a73..87f3c09 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -21,7 +21,13 @@ package org.apache.cassandra.locator;
import com.carrotsearch.hppc.ObjectIntOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
@@ -31,12 +37,23 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Predicate;
import static com.google.common.collect.Iterables.any;
@@ -173,26 +190,132 @@ public class ReplicaPlans
return forSingleReplicaWrite(keyspace, token, replica);
}
+ public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+ {
+ Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+ Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+ ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+ EndpointsForToken.of(token, localSystemReplica),
+ EndpointsForToken.empty(token)
+ );
+
+ return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll);
+ }
+
/**
* Requires that the provided endpoints are alive. Converts them to their relevant system replicas.
* Note that the liveAndDown collection and live are equal to the provided endpoints.
*
- * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO.
- * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear.
+ * @param isAny if batch consistency level is ANY, in which case a local node will be picked
*/
- public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(boolean isAny) throws UnavailableException
{
// A single case we write not for range or token, but multiple mutations to many tokens
Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks()
+ .get(snitch.getLocalDatacenter()));
+ // Replicas are picked manually:
+ // - replicas should be alive according to the failure detector
+ // - replicas should be in the local datacenter
+ // - choose min(2, number of qualifying candiates above)
+ // - allow the local node to be the only replica only if it's a single-node DC
+ Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints);
+
+ if (chosenEndpoints.isEmpty() && isAny)
+ chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
- SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+ SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
EndpointsForToken.empty(token)
);
+
+ // Batchlog is hosted by either one node or two nodes from different racks.
ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+ Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+
// assume that we have already been given live endpoints, and skip applying the failure detector
- return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+ return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+ }
+
+ private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
+ Multimap<String, InetAddressAndPort> endpoints)
+ {
+ return filterBatchlogEndpoints(localRack,
+ endpoints,
+ Collections::shuffle,
+ FailureDetector.isEndpointAlive,
+ ThreadLocalRandom.current()::nextInt);
+ }
+
+ // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+ @VisibleForTesting
+ public static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
+ Multimap<String, InetAddressAndPort> endpoints,
+ Consumer<List<?>> shuffle,
+ Predicate<InetAddressAndPort> isAlive,
+ Function<Integer, Integer> indexPicker)
+ {
+ // special case for single-node data centers
+ if (endpoints.values().size() == 1)
+ return endpoints.values();
+
+ // strip out dead endpoints and localhost
+ ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
+ for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
+ {
+ InetAddressAndPort addr = entry.getValue();
+ if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr))
+ validated.put(entry.getKey(), entry.getValue());
+ }
+
+ if (validated.size() <= 2)
+ return validated.values();
+
+ if (validated.size() - validated.get(localRack).size() >= 2)
+ {
+ // we have enough endpoints in other racks
+ validated.removeAll(localRack);
+ }
+
+ if (validated.keySet().size() == 1)
+ {
+ /*
+ * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
+ * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
+ * because of the preceding if block.
+ */
+ List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
+ shuffle.accept(otherRack);
+ return otherRack.subList(0, 2);
+ }
+
+ // randomize which racks we pick from if more than 2 remaining
+ Collection<String> racks;
+ if (validated.keySet().size() == 2)
+ {
+ racks = validated.keySet();
+ }
+ else
+ {
+ racks = Lists.newArrayList(validated.keySet());
+ shuffle.accept((List<?>) racks);
+ }
+
+ // grab a random member of up to two racks
+ List<InetAddressAndPort> result = new ArrayList<>(2);
+ for (String rack : Iterables.limit(racks, 2))
+ {
+ List<InetAddressAndPort> rackMembers = validated.get(rack);
+ result.add(rackMembers.get(indexPicker.apply(rackMembers.size())));
+ }
+
+ return result;
}
public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 0d1fc8d..456bae5 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -41,8 +41,6 @@ public class SystemReplicas
/**
* There are a few places where a system function borrows write path functionality, but doesn't otherwise
* fit into normal replication strategies (ie: hints and batchlog). So here we provide a replica instance
- * @param endpoint
- * @return
*/
public static Replica getSystemReplica(InetAddressAndPort endpoint)
{
@@ -51,6 +49,9 @@ public class SystemReplicas
public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints)
{
+ if (endpoints.isEmpty())
+ return EndpointsForRange.empty(FULL_RANGE);
+
return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index bd290a1..c6e8496 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1696,13 +1696,13 @@ public final class MessagingService implements MessagingServiceMBean
case all:
break;
case dc:
- if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())))
+ if (snitch.getDatacenter(address).equals(snitch.getLocalDatacenter()))
return false;
break;
case rack:
// for rack then check if the DC's are the same.
- if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddressAndPort()))
- && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())))
+ if (snitch.getRack(address).equals(snitch.getLocalRack())
+ && snitch.getDatacenter(address).equals(snitch.getLocalDatacenter()))
return false;
break;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/RangeRelocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java
index f2af3db..839a34c 100644
--- a/src/java/org/apache/cassandra/service/RangeRelocator.java
+++ b/src/java/org/apache/cassandra/service/RangeRelocator.java
@@ -185,8 +185,7 @@ public class RangeRelocator
//In the single node token move there is nothing to do and Range subtraction is broken
//so it's easier to just identify this case up front.
- if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
-)).size() > 1)
+ if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter()).size() > 1)
{
// getting collection of the currently used ranges by this keyspace
RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index 224fd5e..8814281 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -437,7 +437,7 @@ public class StartupChecks
String storedDc = SystemKeyspace.getDatacenter();
if (storedDc != null)
{
- String currentDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ String currentDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
if (!storedDc.equals(currentDc))
{
String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " +
@@ -459,7 +459,7 @@ public class StartupChecks
String storedRack = SystemKeyspace.getRack();
if (storedRack != null)
{
- String currentRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+ String currentRack = DatabaseDescriptor.getEndpointSnitch().getLocalRack();
if (!storedRack.equals(currentRack))
{
String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c6315ff..b3adc47 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -80,6 +80,8 @@ import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.AbstractIterator;
+import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
+
public class StorageProxy implements StorageProxyMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
@@ -446,7 +448,7 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
for (Replica replica: replicaPlan.contacts())
{
- if (replica.isLocal())
+ if (replica.isSelf())
{
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new Runnable()
{
@@ -484,7 +486,7 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
for (Replica replica : replicaPlan.contacts())
{
- if (replica.isLocal())
+ if (replica.isSelf())
{
StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new Runnable()
{
@@ -549,7 +551,7 @@ public class StorageProxy implements StorageProxyMBean
{
if (shouldBlock)
{
- if (replica.isLocal())
+ if (replica.isSelf())
commitPaxosLocal(replica, message, responseHandler);
else
MessagingService.instance().sendWriteRR(message, replica, responseHandler, allowHints && shouldHint(replica));
@@ -623,7 +625,7 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
{
Tracing.trace("Determining replicas for mutation");
- final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
long startTime = System.nanoTime();
@@ -755,7 +757,7 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
- final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
long startTime = System.nanoTime();
@@ -780,8 +782,9 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
//Since the base -> view replication is 1:1 we only need to store the BL locally
- final Collection<InetAddressAndPort> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
- BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite();
+ BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(),
+ () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID));
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
@@ -806,7 +809,7 @@ public class StorageProxy implements StorageProxyMBean
// When local node is the endpoint we can just apply the mutation locally,
// unless there are pending endpoints, in which case we want to do an ordinary
// write so the view mutation is sent to the pending endpoint
- if (pairedEndpoint.get().isLocal() && StorageService.instance.isJoined()
+ if (pairedEndpoint.get().isSelf() && StorageService.instance.isJoined()
&& pendingReplicas.isEmpty())
{
try
@@ -899,7 +902,6 @@ public class StorageProxy implements StorageProxyMBean
long startTime = System.nanoTime();
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
throw new AssertionError("Logged batches are unsupported with transient replication");
@@ -920,10 +922,11 @@ public class StorageProxy implements StorageProxyMBean
batchConsistencyLevel = consistency_level;
}
- final Collection<InetAddressAndPort> batchlogEndpoints = getBatchlogReplicas(localDataCenter, batchConsistencyLevel);
+ ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY);
+
final UUID batchUUID = UUIDGen.getTimeUUID();
- BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
- () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+ BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(),
+ () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID));
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
@@ -939,10 +942,10 @@ public class StorageProxy implements StorageProxyMBean
}
// write to the batchlog
- syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime);
+ syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime);
// now actually perform the writes and wait for them to complete
- syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
+ syncWriteBatchedMutations(wrappers, Stage.MUTATION);
}
catch (UnavailableException e)
{
@@ -998,11 +1001,9 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime)
+ private static void syncWriteToBatchlog(Collection<Mutation> mutations, ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid, long queryStartNanoTime)
throws WriteTimeoutException, WriteFailureException
{
- Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
- ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
WriteType.BATCH_LOG,
queryStartNanoTime);
@@ -1013,7 +1014,7 @@ public class StorageProxy implements StorageProxyMBean
{
logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
- if (replica.isLocal())
+ if (replica.isSelf())
performLocally(Stage.MUTATION, replica, Optional.empty(), () -> BatchlogManager.store(batch), handler);
else
MessagingService.instance().sendRR(message, replica.endpoint(), handler);
@@ -1021,18 +1022,18 @@ public class StorageProxy implements StorageProxyMBean
handler.get();
}
- private static void asyncRemoveFromBatchlog(Collection<InetAddressAndPort> endpoints, UUID uuid)
+ private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid)
{
MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
- for (InetAddressAndPort target : endpoints)
+ for (Replica target : replicaPlan.contacts())
{
if (logger.isTraceEnabled())
logger.trace("Sending batchlog remove request {} to {}", uuid, target);
- if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
- performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
+ if (target.isSelf())
+ performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid));
else
- MessagingService.instance().sendOneWay(message, target);
+ MessagingService.instance().sendOneWay(message, target.endpoint());
}
}
@@ -1054,9 +1055,11 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
+ private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, Stage stage)
throws WriteTimeoutException, OverloadedException
{
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
+
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
@@ -1162,32 +1165,6 @@ public class StorageProxy implements StorageProxyMBean
}
}
- /*
- * Replicas are picked manually:
- * - replicas should be alive according to the failure detector
- * - replicas should be in the local datacenter
- * - choose min(2, number of qualifying candiates above)
- * - allow the local node to be the only replica only if it's a single-node DC
- */
- private static Collection<InetAddressAndPort> getBatchlogReplicas(String localDataCenter, ConsistencyLevel consistencyLevel)
- throws UnavailableException
- {
- TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
- Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
- String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
-
- Collection<InetAddressAndPort> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
- if (chosenEndpoints.isEmpty())
- {
- if (consistencyLevel == ConsistencyLevel.ANY)
- return Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
-
- throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0);
- }
-
- return chosenEndpoints;
- }
-
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.
@@ -1231,7 +1208,7 @@ public class StorageProxy implements StorageProxyMBean
if (plan.isAlive(destination))
{
- if (destination.isLocal())
+ if (destination.isSelf())
{
insertLocal = true;
localReplica = destination;
@@ -1424,7 +1401,7 @@ public class StorageProxy implements StorageProxyMBean
{
Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
- if (replica.isLocal())
+ if (replica.isSelf())
{
return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime);
}
@@ -2096,7 +2073,7 @@ public class StorageProxy implements StorageProxyMBean
command.trackRepairedStatus();
}
- if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal())
+ if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
{
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
}
@@ -2366,7 +2343,7 @@ public class StorageProxy implements StorageProxyMBean
{
if (!DatabaseDescriptor.hintedHandoffEnabled())
return false;
- if (replica.isTransient() || replica.isLocal())
+ if (replica.isTransient() || replica.isSelf())
return false;
Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 391598c..caa732a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1060,8 +1060,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void gossipSnitchInfo()
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
- String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort());
+ String dc = snitch.getLocalDatacenter();
+ String rack = snitch.getLocalRack();
Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc));
Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
}
@@ -1835,7 +1835,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private boolean isLocalDC(InetAddressAndPort targetHost)
{
String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
- String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ String localDC = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
return remoteDC.equals(localDC);
}
@@ -4073,7 +4073,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
PendingRangeCalculatorService.instance.blockUntilFinished();
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ String dc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 6881a2f..8d0f14c 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -138,7 +138,7 @@ public abstract class AbstractReadExecutor
for (Replica replica: replicas)
{
InetAddressAndPort endpoint = replica.endpoint();
- if (replica.isLocal())
+ if (replica.isSelf())
{
hasLocalEndpoint = true;
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index b16d105..2e4440f 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -181,7 +181,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
- if (source.isLocal())
+ if (source.isSelf())
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
else
MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index ea54f9d..3fcabd0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -104,8 +104,8 @@ public class StreamPlan
public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
- assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
- assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+ assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+ assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
StreamSession session = coordinator.getOrCreateNextSession(from);
session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 80fcebb..08a1b07 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -311,8 +311,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
- assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
- assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+ assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+ assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
index 41564d9..c2b9fc9 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -28,9 +28,12 @@ import org.junit.Test;
import org.junit.matchers.JUnitMatchers;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public class BatchlogEndpointFilterTest
{
@@ -47,10 +50,10 @@ public class BatchlogEndpointFilterTest
.put("2", InetAddressAndPort.getByName("2"))
.put("2", InetAddressAndPort.getByName("22"))
.build();
- Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("11")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("22")));
}
@Test
@@ -61,10 +64,10 @@ public class BatchlogEndpointFilterTest
.put(LOCAL, InetAddressAndPort.getByName("00"))
.put("1", InetAddressAndPort.getByName("1"))
.build();
- Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1")));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("1")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("0")));
}
@Test
@@ -73,9 +76,9 @@ public class BatchlogEndpointFilterTest
Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
.put(LOCAL, InetAddressAndPort.getByName("0"))
.build();
- Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
assertThat(result.size(), is(1));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("0")));
}
@Test
@@ -88,12 +91,12 @@ public class BatchlogEndpointFilterTest
.put("1", InetAddressAndPort.getByName("11"))
.put("1", InetAddressAndPort.getByName("111"))
.build();
- Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
// result should be the last two non-local replicas
// (Collections.shuffle has been replaced with Collections.reverse for testing)
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("11")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("111")));
}
@Test
@@ -105,40 +108,22 @@ public class BatchlogEndpointFilterTest
.put(LOCAL, InetAddressAndPort.getByName("111"))
.put(LOCAL, InetAddressAndPort.getByName("1111"))
.build();
- Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
// result should be the last two non-local replicas
// (Collections.shuffle has been replaced with Collections.reverse for testing)
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
- assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("111")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("1111")));
}
- private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+ private Collection<InetAddressAndPort> filterBatchlogEndpoints(Multimap<String, InetAddressAndPort> endpoints)
{
- TestEndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints)
- {
- super(localRack, endpoints);
- }
-
- @Override
- protected boolean isValid(InetAddressAndPort input)
- {
- // We will use always alive non-localhost endpoints
- return true;
- }
-
- @Override
- protected int getRandomInt(int bound)
- {
- // We don't need random behavior here
- return bound - 1;
- }
-
- @Override
- protected void shuffle(List<?> list)
- {
- // We don't need random behavior here
- Collections.reverse(list);
- }
+ return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints,
+ // Reverse instead of shuffle
+ Collections::reverse,
+ // Always alive
+ (addr) -> true,
+ // Always pick the last
+ (size) -> size - 1);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org