You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/27 16:04:46 UTC

cassandra git commit: Consolidate batch write code

Repository: cassandra
Updated Branches:
  refs/heads/trunk 914c66685 -> 29f83b888


Consolidate batch write code

Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14742

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29f83b88
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29f83b88
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29f83b88

Branch: refs/heads/trunk
Commit: 29f83b88821c4792087df19d829ac87b5c06e9e6
Parents: 914c666
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Sep 17 15:13:05 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Sep 27 18:03:48 2018 +0200

----------------------------------------------------------------------
 .../cassandra/batchlog/BatchlogManager.java     |  89 -------------
 .../cassandra/config/DatabaseDescriptor.java    |   2 +-
 .../db/CounterMutationVerbHandler.java          |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   4 +-
 .../org/apache/cassandra/db/view/ViewUtils.java |   6 +-
 .../org/apache/cassandra/dht/Datacenters.java   |   2 +-
 .../cassandra/dht/RangeFetchMapCalculator.java  |   2 +-
 .../org/apache/cassandra/dht/RangeStreamer.java |   8 +-
 .../cassandra/locator/EndpointSnitchInfo.java   |   4 +-
 .../cassandra/locator/IEndpointSnitch.java      |  22 ++-
 .../apache/cassandra/locator/InOurDcTester.java |   2 +-
 .../org/apache/cassandra/locator/Replica.java   |   2 +-
 .../apache/cassandra/locator/ReplicaPlans.java  | 133 ++++++++++++++++++-
 .../cassandra/locator/SystemReplicas.java       |   5 +-
 .../apache/cassandra/net/MessagingService.java  |   6 +-
 .../cassandra/service/RangeRelocator.java       |   3 +-
 .../apache/cassandra/service/StartupChecks.java |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |  85 +++++-------
 .../cassandra/service/StorageService.java       |   8 +-
 .../service/reads/AbstractReadExecutor.java     |   2 +-
 .../reads/ShortReadPartitionsProtection.java    |   2 +-
 .../apache/cassandra/streaming/StreamPlan.java  |   4 +-
 .../cassandra/streaming/StreamSession.java      |   4 +-
 .../batchlog/BatchlogEndpointFilterTest.java    |  65 ++++-----
 24 files changed, 240 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 77f725c..91129ed 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -527,93 +527,4 @@ public class BatchlogManager implements BatchlogManagerMBean
             }
         }
     }
-
-    public static class EndpointFilter
-    {
-        private final String localRack;
-        private final Multimap<String, InetAddressAndPort> endpoints;
-
-        public EndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints)
-        {
-            this.localRack = localRack;
-            this.endpoints = endpoints;
-        }
-
-        /**
-         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
-         */
-        public Collection<InetAddressAndPort> filter()
-        {
-            // special case for single-node data centers
-            if (endpoints.values().size() == 1)
-                return endpoints.values();
-
-            // strip out dead endpoints and localhost
-            ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
-            for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
-                if (isValid(entry.getValue()))
-                    validated.put(entry.getKey(), entry.getValue());
-
-            if (validated.size() <= 2)
-                return validated.values();
-
-            if (validated.size() - validated.get(localRack).size() >= 2)
-            {
-                // we have enough endpoints in other racks
-                validated.removeAll(localRack);
-            }
-
-            if (validated.keySet().size() == 1)
-            {
-                /*
-                 * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
-                 * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
-                 * because of the preceding if block.
-                 */
-                List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
-                shuffle(otherRack);
-                return otherRack.subList(0, 2);
-            }
-
-            // randomize which racks we pick from if more than 2 remaining
-            Collection<String> racks;
-            if (validated.keySet().size() == 2)
-            {
-                racks = validated.keySet();
-            }
-            else
-            {
-                racks = Lists.newArrayList(validated.keySet());
-                shuffle((List<String>) racks);
-            }
-
-            // grab a random member of up to two racks
-            List<InetAddressAndPort> result = new ArrayList<>(2);
-            for (String rack : Iterables.limit(racks, 2))
-            {
-                List<InetAddressAndPort> rackMembers = validated.get(rack);
-                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
-            }
-
-            return result;
-        }
-
-        @VisibleForTesting
-        protected boolean isValid(InetAddressAndPort input)
-        {
-            return !input.equals(FBUtilities.getBroadcastAddressAndPort()) && FailureDetector.instance.isAlive(input);
-        }
-
-        @VisibleForTesting
-        protected int getRandomInt(int bound)
-        {
-            return ThreadLocalRandom.current().nextInt(bound);
-        }
-
-        @VisibleForTesting
-        protected void shuffle(List<?> list)
-        {
-            Collections.shuffle(list);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2ad9b18..dc76431 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -991,7 +991,7 @@ public class DatabaseDescriptor
         snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
         EndpointSnitchInfo.create();
 
-        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        localDC = snitch.getLocalDatacenter();
         localComparator = (replica1, replica2) -> {
             boolean local1 = localDC.equals(snitch.getDatacenter(replica1));
             boolean local2 = localDC.equals(snitch.getDatacenter(replica2));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 95d7916..c946ea5 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -37,7 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
         final CounterMutation cm = message.payload;
         logger.trace("Applying forwarded {}", cm);
 
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
         // We should not wait for the result of the write in this thread,
         // otherwise we could have a distributed deadlock between replicas
         // running this VerbHandler (see #4578).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0f904ce..1b3b2a6 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -460,8 +460,8 @@ public final class SystemKeyspace
                             FBUtilities.getReleaseVersionString(),
                             QueryProcessor.CQL_VERSION.toString(),
                             String.valueOf(ProtocolVersion.CURRENT.asInt()),
-                            snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()),
-                            snitch.getRack(FBUtilities.getBroadcastAddressAndPort()),
+                            snitch.getLocalDatacenter(),
+                            snitch.getLocalRack(),
                             DatabaseDescriptor.getPartitioner().getClass().getName(),
                             DatabaseDescriptor.getRpcAddress(),
                             DatabaseDescriptor.getNativeTransportPort(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index ad10d9d..e824732 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -63,11 +63,11 @@ public final class ViewUtils
     {
         AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
 
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
         EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken);
         EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken);
 
-        Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil();
+        Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isSelf).toJavaUtil();
         if (localReplica.isPresent())
             return localReplica;
 
@@ -93,7 +93,7 @@ public final class ViewUtils
         int baseIdx = -1;
         for (int i=0; i<baseReplicas.size(); i++)
         {
-            if (baseReplicas.get(i).isLocal())
+            if (baseReplicas.get(i).isSelf())
             {
                 baseIdx = i;
                 break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/Datacenters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Datacenters.java b/src/java/org/apache/cassandra/dht/Datacenters.java
index 26ae2e6..9695a09 100644
--- a/src/java/org/apache/cassandra/dht/Datacenters.java
+++ b/src/java/org/apache/cassandra/dht/Datacenters.java
@@ -32,7 +32,7 @@ public class Datacenters
 
     private static class DCHandle
     {
-        private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
     }
 
     public static String thisDatacenter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 63265b7..2a2de01 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -357,7 +357,7 @@ public class RangeFetchMapCalculator
             {
                 sourceFound = true;
                 // if we pass filters, it means that we don't filter away localhost and we can count it as a source:
-                if (replica.isLocal())
+                if (replica.isSelf())
                     continue; // but don't add localhost to the graph to avoid streaming locally
                 final Vertex endpointVertex = new EndpointVertex(replica.endpoint());
                 capacityGraph.insertVertex(rangeVertex);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index f46d665..b50a4e2 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -104,7 +104,7 @@ public class RangeStreamer
         {
             Preconditions.checkNotNull(local);
             Preconditions.checkNotNull(remote);
-            assert local.isLocal() && !remote.isLocal();
+            assert local.isSelf() && !remote.isSelf();
             this.local = local;
             this.remote = remote;
         }
@@ -203,7 +203,7 @@ public class RangeStreamer
         @Override
         public boolean apply(Replica replica)
         {
-            return !replica.isLocal();
+            return !replica.isSelf();
         }
 
         @Override
@@ -553,8 +553,8 @@ public class RangeStreamer
         {
             for (Replica source : e.getValue())
             {
-                assert e.getKey().isLocal();
-                assert !source.isLocal();
+                assert (e.getKey()).isSelf();
+                assert !source.isSelf();
                 workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index c06d765..da90a79 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -53,12 +53,12 @@ public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
 
     public String getDatacenter()
     {
-        return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        return DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
     }
 
     public String getRack()
     {
-        return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+        return DatabaseDescriptor.getEndpointSnitch().getLocalRack();
     }
 
     public String getSnitchName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index b7797b0..381a642 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.locator;
 
 import java.util.Set;
 
+import org.apache.cassandra.utils.FBUtilities;
+
 /**
  * This interface helps determine location of node in the datacenter relative to another node.
  * Give a node A and another node B it can tell if A and B are on the same rack or in the same
@@ -28,15 +30,31 @@ import java.util.Set;
 public interface IEndpointSnitch
 {
     /**
-     * returns a String representing the rack this endpoint belongs to
+     * returns a String representing the rack the given endpoint belongs to
      */
     public String getRack(InetAddressAndPort endpoint);
 
     /**
-     * returns a String representing the datacenter this endpoint belongs to
+     * returns a String representing the rack current endpoint belongs to
+     */
+    default public String getLocalRack()
+    {
+        return getRack(FBUtilities.getBroadcastAddressAndPort());
+    }
+
+    /**
+     * returns a String representing the datacenter the given endpoint belongs to
      */
     public String getDatacenter(InetAddressAndPort endpoint);
 
+    /**
+     * returns a String representing the datacenter current endpoint belongs to
+     */
+    default public String getLocalDatacenter()
+    {
+        return getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+    }
+
     default public String getDatacenter(Replica replica)
     {
         return getDatacenter(replica.endpoint());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/InOurDcTester.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InOurDcTester.java b/src/java/org/apache/cassandra/locator/InOurDcTester.java
index 23a8c13..514c7ef 100644
--- a/src/java/org/apache/cassandra/locator/InOurDcTester.java
+++ b/src/java/org/apache/cassandra/locator/InOurDcTester.java
@@ -43,7 +43,7 @@ public class InOurDcTester
                 // this final clause checks if somehow the snitch/localDc have got out of whack;
                 // presently, this is possible but very unlikely, but this check will also help
                 // resolve races on these global fields as well
-                || !dc.equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()));
+                || !dc.equals(snitch.getLocalDatacenter());
     }
 
     private static final class ReplicaTester extends InOurDcTester implements Predicate<Replica>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/Replica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java
index 37b6050..c884f13 100644
--- a/src/java/org/apache/cassandra/locator/Replica.java
+++ b/src/java/org/apache/cassandra/locator/Replica.java
@@ -100,7 +100,7 @@ public final class Replica implements Comparable<Replica>
         return endpoint;
     }
 
-    public boolean isLocal()
+    public boolean isSelf()
     {
         return endpoint.equals(FBUtilities.getBroadcastAddressAndPort());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 3d56a73..87f3c09 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -21,7 +21,13 @@ package org.apache.cassandra.locator;
 import com.carrotsearch.hppc.ObjectIntOpenHashMap;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
@@ -31,12 +37,23 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static com.google.common.collect.Iterables.any;
@@ -173,26 +190,132 @@ public class ReplicaPlans
         return forSingleReplicaWrite(keyspace, token, replica);
     }
 
+    public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
+    {
+        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+        Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
+
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+                EndpointsForToken.of(token, localSystemReplica),
+                EndpointsForToken.empty(token)
+        );
+
+        return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll);
+    }
+
     /**
      * Requires that the provided endpoints are alive.  Converts them to their relevant system replicas.
      * Note that the liveAndDown collection and live are equal to the provided endpoints.
      *
-     * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO.
-     * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear.
+     * @param isAny if batch consistency level is ANY, in which case a local node will be picked
      */
-    public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    public static ReplicaPlan.ForTokenWrite forBatchlogWrite(boolean isAny) throws UnavailableException
     {
         // A single case we write not for range or token, but multiple mutations to many tokens
         Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
 
+        TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks()
+                                                                                          .get(snitch.getLocalDatacenter()));
+        // Replicas are picked manually:
+        //  - replicas should be alive according to the failure detector
+        //  - replicas should be in the local datacenter
+        //  - choose min(2, number of qualifying candiates above)
+        //  - allow the local node to be the only replica only if it's a single-node DC
+        Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints);
+
+        if (chosenEndpoints.isEmpty() && isAny)
+            chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+
         ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
-                SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+                SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
                 EndpointsForToken.empty(token)
         );
+
+        // Batchlog is hosted by either one node or two nodes from different racks.
         ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
 
+        Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+
         // assume that we have already been given live endpoints, and skip applying the failure detector
-        return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+        return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+    }
+
+    private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
+                                                                          Multimap<String, InetAddressAndPort> endpoints)
+    {
+        return filterBatchlogEndpoints(localRack,
+                                       endpoints,
+                                       Collections::shuffle,
+                                       FailureDetector.isEndpointAlive,
+                                       ThreadLocalRandom.current()::nextInt);
+    }
+
+    // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+    @VisibleForTesting
+    public static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
+                                                                         Multimap<String, InetAddressAndPort> endpoints,
+                                                                         Consumer<List<?>> shuffle,
+                                                                         Predicate<InetAddressAndPort> isAlive,
+                                                                         Function<Integer, Integer> indexPicker)
+    {
+        // special case for single-node data centers
+        if (endpoints.values().size() == 1)
+            return endpoints.values();
+
+        // strip out dead endpoints and localhost
+        ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
+        for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
+        {
+            InetAddressAndPort addr = entry.getValue();
+            if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr))
+                validated.put(entry.getKey(), entry.getValue());
+        }
+
+        if (validated.size() <= 2)
+            return validated.values();
+
+        if (validated.size() - validated.get(localRack).size() >= 2)
+        {
+            // we have enough endpoints in other racks
+            validated.removeAll(localRack);
+        }
+
+        if (validated.keySet().size() == 1)
+        {
+            /*
+             * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
+             * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
+             * because of the preceding if block.
+             */
+            List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
+            shuffle.accept(otherRack);
+            return otherRack.subList(0, 2);
+        }
+
+        // randomize which racks we pick from if more than 2 remaining
+        Collection<String> racks;
+        if (validated.keySet().size() == 2)
+        {
+            racks = validated.keySet();
+        }
+        else
+        {
+            racks = Lists.newArrayList(validated.keySet());
+            shuffle.accept((List<?>) racks);
+        }
+
+        // grab a random member of up to two racks
+        List<InetAddressAndPort> result = new ArrayList<>(2);
+        for (String rack : Iterables.limit(racks, 2))
+        {
+            List<InetAddressAndPort> rackMembers = validated.get(rack);
+            result.add(rackMembers.get(indexPicker.apply(rackMembers.size())));
+        }
+
+        return result;
     }
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 0d1fc8d..456bae5 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -41,8 +41,6 @@ public class SystemReplicas
     /**
      * There are a few places where a system function borrows write path functionality, but doesn't otherwise
      * fit into normal replication strategies (ie: hints and batchlog). So here we provide a replica instance
-     * @param endpoint
-     * @return
      */
     public static Replica getSystemReplica(InetAddressAndPort endpoint)
     {
@@ -51,6 +49,9 @@ public class SystemReplicas
 
     public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints)
     {
+        if (endpoints.isEmpty())
+            return EndpointsForRange.empty(FULL_RANGE);
+
         return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index bd290a1..c6e8496 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1696,13 +1696,13 @@ public final class MessagingService implements MessagingServiceMBean
             case all:
                 break;
             case dc:
-                if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())))
+                if (snitch.getDatacenter(address).equals(snitch.getLocalDatacenter()))
                     return false;
                 break;
             case rack:
                 // for rack then check if the DC's are the same.
-                if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddressAndPort()))
-                    && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())))
+                if (snitch.getRack(address).equals(snitch.getLocalRack())
+                    && snitch.getDatacenter(address).equals(snitch.getLocalDatacenter()))
                     return false;
                 break;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/RangeRelocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java
index f2af3db..839a34c 100644
--- a/src/java/org/apache/cassandra/service/RangeRelocator.java
+++ b/src/java/org/apache/cassandra/service/RangeRelocator.java
@@ -185,8 +185,7 @@ public class RangeRelocator
 
                 //In the single node token move there is nothing to do and Range subtraction is broken
                 //so it's easier to just identify this case up front.
-                if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
-)).size() > 1)
+                if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter()).size() > 1)
                 {
                     // getting collection of the currently used ranges by this keyspace
                     RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index 224fd5e..8814281 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -437,7 +437,7 @@ public class StartupChecks
                 String storedDc = SystemKeyspace.getDatacenter();
                 if (storedDc != null)
                 {
-                    String currentDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+                    String currentDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
                     if (!storedDc.equals(currentDc))
                     {
                         String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " +
@@ -459,7 +459,7 @@ public class StartupChecks
                 String storedRack = SystemKeyspace.getRack();
                 if (storedRack != null)
                 {
-                    String currentRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
+                    String currentRack = DatabaseDescriptor.getEndpointSnitch().getLocalRack();
                     if (!storedRack.equals(currentRack))
                     {
                         String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c6315ff..b3adc47 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -80,6 +80,8 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
 
+import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
+
 public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
@@ -446,7 +448,7 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
         for (Replica replica: replicaPlan.contacts())
         {
-            if (replica.isLocal())
+            if (replica.isSelf())
             {
                 StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new Runnable()
                 {
@@ -484,7 +486,7 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
         for (Replica replica : replicaPlan.contacts())
         {
-            if (replica.isLocal())
+            if (replica.isSelf())
             {
                 StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new Runnable()
                 {
@@ -549,7 +551,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 if (shouldBlock)
                 {
-                    if (replica.isLocal())
+                    if (replica.isSelf())
                         commitPaxosLocal(replica, message, responseHandler);
                     else
                         MessagingService.instance().sendWriteRR(message, replica, responseHandler, allowHints && shouldHint(replica));
@@ -623,7 +625,7 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
-        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
 
         long startTime = System.nanoTime();
 
@@ -755,7 +757,7 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
-        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
 
         long startTime = System.nanoTime();
 
@@ -780,8 +782,9 @@ public class StorageProxy implements StorageProxyMBean
                 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
 
                 //Since the base -> view replication is 1:1 we only need to store the BL locally
-                final Collection<InetAddressAndPort> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
-                BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+                ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite();
+                BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(),
+                                                              () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID));
 
                 // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
                 for (Mutation mutation : mutations)
@@ -806,7 +809,7 @@ public class StorageProxy implements StorageProxyMBean
                     // When local node is the endpoint we can just apply the mutation locally,
                     // unless there are pending endpoints, in which case we want to do an ordinary
                     // write so the view mutation is sent to the pending endpoint
-                    if (pairedEndpoint.get().isLocal() && StorageService.instance.isJoined()
+                    if (pairedEndpoint.get().isSelf() && StorageService.instance.isJoined()
                         && pendingReplicas.isEmpty())
                     {
                         try
@@ -899,7 +902,6 @@ public class StorageProxy implements StorageProxyMBean
         long startTime = System.nanoTime();
 
         List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
         if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
             throw new AssertionError("Logged batches are unsupported with transient replication");
@@ -920,10 +922,11 @@ public class StorageProxy implements StorageProxyMBean
                     batchConsistencyLevel = consistency_level;
             }
 
-            final Collection<InetAddressAndPort> batchlogEndpoints = getBatchlogReplicas(localDataCenter, batchConsistencyLevel);
+            ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY);
+
             final UUID batchUUID = UUIDGen.getTimeUUID();
-            BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+            BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(),
+                                                          () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID));
 
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
@@ -939,10 +942,10 @@ public class StorageProxy implements StorageProxyMBean
             }
 
             // write to the batchlog
-            syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime);
+            syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime);
 
             // now actually perform the writes and wait for them to complete
-            syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
+            syncWriteBatchedMutations(wrappers, Stage.MUTATION);
         }
         catch (UnavailableException e)
         {
@@ -998,11 +1001,9 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime)
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {
-        Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
         WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
                                                                    WriteType.BATCH_LOG,
                                                                    queryStartNanoTime);
@@ -1013,7 +1014,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
 
-            if (replica.isLocal())
+            if (replica.isSelf())
                 performLocally(Stage.MUTATION, replica, Optional.empty(), () -> BatchlogManager.store(batch), handler);
             else
                 MessagingService.instance().sendRR(message, replica.endpoint(), handler);
@@ -1021,18 +1022,18 @@ public class StorageProxy implements StorageProxyMBean
         handler.get();
     }
 
-    private static void asyncRemoveFromBatchlog(Collection<InetAddressAndPort> endpoints, UUID uuid)
+    private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid)
     {
         MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
-        for (InetAddressAndPort target : endpoints)
+        for (Replica target : replicaPlan.contacts())
         {
             if (logger.isTraceEnabled())
                 logger.trace("Sending batchlog remove request {} to {}", uuid, target);
 
-            if (target.equals(FBUtilities.getBroadcastAddressAndPort()))
-                performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid));
+            if (target.isSelf())
+                performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid));
             else
-                MessagingService.instance().sendOneWay(message, target);
+                MessagingService.instance().sendOneWay(message, target.endpoint());
         }
     }
 
@@ -1054,9 +1055,11 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
+    private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, Stage stage)
     throws WriteTimeoutException, OverloadedException
     {
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
+
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
             EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
@@ -1162,32 +1165,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    /*
-     * Replicas are picked manually:
-     * - replicas should be alive according to the failure detector
-     * - replicas should be in the local datacenter
-     * - choose min(2, number of qualifying candiates above)
-     * - allow the local node to be the only replica only if it's a single-node DC
-     */
-    private static Collection<InetAddressAndPort> getBatchlogReplicas(String localDataCenter, ConsistencyLevel consistencyLevel)
-    throws UnavailableException
-    {
-        TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
-        Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
-        String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
-
-        Collection<InetAddressAndPort> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
-        if (chosenEndpoints.isEmpty())
-        {
-            if (consistencyLevel == ConsistencyLevel.ANY)
-                return Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
-
-            throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0);
-        }
-
-        return chosenEndpoints;
-    }
-
     /**
      * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
      * is not available.
@@ -1231,7 +1208,7 @@ public class StorageProxy implements StorageProxyMBean
 
             if (plan.isAlive(destination))
             {
-                if (destination.isLocal())
+                if (destination.isSelf())
                 {
                     insertLocal = true;
                     localReplica = destination;
@@ -1424,7 +1401,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
 
-        if (replica.isLocal())
+        if (replica.isSelf())
         {
             return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime);
         }
@@ -2096,7 +2073,7 @@ public class StorageProxy implements StorageProxyMBean
                 command.trackRepairedStatus();
             }
 
-            if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal())
+            if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
             {
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
             }
@@ -2366,7 +2343,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         if (!DatabaseDescriptor.hintedHandoffEnabled())
             return false;
-        if (replica.isTransient() || replica.isLocal())
+        if (replica.isTransient() || replica.isSelf())
             return false;
 
         Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 391598c..caa732a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1060,8 +1060,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public void gossipSnitchInfo()
     {
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-        String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort());
+        String dc = snitch.getLocalDatacenter();
+        String rack = snitch.getLocalRack();
         Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc));
         Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
     }
@@ -1835,7 +1835,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private boolean isLocalDC(InetAddressAndPort targetHost)
     {
         String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
-        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        String localDC = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
         return remoteDC.equals(localDC);
     }
 
@@ -4073,7 +4073,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             PendingRangeCalculatorService.instance.blockUntilFinished();
 
-            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+            String dc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
 
             if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 6881a2f..8d0f14c 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -138,7 +138,7 @@ public abstract class AbstractReadExecutor
         for (Replica replica: replicas)
         {
             InetAddressAndPort endpoint = replica.endpoint();
-            if (replica.isLocal())
+            if (replica.isSelf())
             {
                 hasLocalEndpoint = true;
                 continue;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index b16d105..2e4440f 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -181,7 +181,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
         DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
         ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
 
-        if (source.isLocal())
+        if (source.isSelf())
             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
         else
             MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index ea54f9d..3fcabd0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -104,8 +104,8 @@ public class StreamPlan
     public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
     {
         //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
-        assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
-        assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+        assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+        assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
 
         StreamSession session = coordinator.getOrCreateNextSession(from);
         session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 80fcebb..08a1b07 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -311,8 +311,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
     {
         //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
-        assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
-        assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+        assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+        assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
 
         requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
index 41564d9..c2b9fc9 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -28,9 +28,12 @@ import org.junit.Test;
 import org.junit.matchers.JUnitMatchers;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public class BatchlogEndpointFilterTest
 {
@@ -47,10 +50,10 @@ public class BatchlogEndpointFilterTest
                 .put("2", InetAddressAndPort.getByName("2"))
                 .put("2", InetAddressAndPort.getByName("22"))
                 .build();
-        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("11")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("22")));
     }
 
     @Test
@@ -61,10 +64,10 @@ public class BatchlogEndpointFilterTest
                 .put(LOCAL, InetAddressAndPort.getByName("00"))
                 .put("1", InetAddressAndPort.getByName("1"))
                 .build();
-        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("1")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("0")));
     }
 
     @Test
@@ -73,9 +76,9 @@ public class BatchlogEndpointFilterTest
         Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
                 .put(LOCAL, InetAddressAndPort.getByName("0"))
                 .build();
-        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
         assertThat(result.size(), is(1));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("0")));
     }
 
     @Test
@@ -88,12 +91,12 @@ public class BatchlogEndpointFilterTest
                 .put("1", InetAddressAndPort.getByName("11"))
                 .put("1", InetAddressAndPort.getByName("111"))
                 .build();
-        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
         // result should be the last two non-local replicas
         // (Collections.shuffle has been replaced with Collections.reverse for testing)
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("11")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("111")));
     }
 
     @Test
@@ -105,40 +108,22 @@ public class BatchlogEndpointFilterTest
                 .put(LOCAL, InetAddressAndPort.getByName("111"))
                 .put(LOCAL, InetAddressAndPort.getByName("1111"))
                 .build();
-        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
         // result should be the last two non-local replicas
         // (Collections.shuffle has been replaced with Collections.reverse for testing)
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("111")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("1111")));
     }
 
-    private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+    private Collection<InetAddressAndPort> filterBatchlogEndpoints(Multimap<String, InetAddressAndPort> endpoints)
     {
-        TestEndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints)
-        {
-            super(localRack, endpoints);
-        }
-
-        @Override
-        protected boolean isValid(InetAddressAndPort input)
-        {
-            // We will use always alive non-localhost endpoints
-            return true;
-        }
-
-        @Override
-        protected int getRandomInt(int bound)
-        {
-            // We don't need random behavior here
-            return bound - 1;
-        }
-
-        @Override
-        protected void shuffle(List<?> list)
-        {
-            // We don't need random behavior here
-            Collections.reverse(list);
-        }
+        return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints,
+                                                    // Reverse instead of shuffle
+                                                    Collections::reverse,
+                                                    // Always alive
+                                                    (addr) -> true,
+                                                    // Always pick the last
+                                                    (size) -> size - 1);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org