You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/07/04 01:23:44 UTC

[24/53] [abbrv] lucene-solr:feature/autoscaling: SOLR-1095: Refactor code to standardize replica assignment

SOLR-1095: Refactor code to standardize replica assignment


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/196d84b9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/196d84b9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/196d84b9

Branch: refs/heads/feature/autoscaling
Commit: 196d84b9e08730e9af225450217032cf70d52b5a
Parents: 0159d49
Author: Noble Paul <no...@apache.org>
Authored: Fri Jun 30 15:49:40 2017 +0930
Committer: Noble Paul <no...@apache.org>
Committed: Fri Jun 30 15:49:40 2017 +0930

----------------------------------------------------------------------
 .../src/java/org/apache/solr/cloud/Assign.java  | 131 ++++++++++++++++---
 .../apache/solr/cloud/CreateCollectionCmd.java  |  31 ++---
 .../java/org/apache/solr/cloud/RestoreCmd.java  |  30 +++--
 .../org/apache/solr/cloud/SplitShardCmd.java    |  13 +-
 .../apache/solr/cloud/rule/ReplicaAssigner.java |  68 ++++------
 .../solr/common/cloud/ReplicaPosition.java      |  55 ++++++++
 .../apache/solr/cloud/rule/RuleEngineTest.java  |   8 +-
 7 files changed, 234 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index 9f21245..7c9752d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -25,10 +25,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -40,7 +44,9 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
@@ -49,6 +55,11 @@ import org.apache.zookeeper.KeeperException;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
@@ -119,7 +130,7 @@ public class Assign {
     returnShardId = shardIdNames.get(0);
     return returnShardId;
   }
-  
+
   public static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
     // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
     return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
@@ -141,6 +152,91 @@ public class Assign {
       else return replicaName;
     }
   }
+  public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
+    // TODO: add smarter options that look at the current number of cores per
+    // node?
+    // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
+
+    List<String> nodeList;
+
+    final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
+    final List<String> createNodeList = (createNodeSetStr == null) ? null : StrUtils.splitSmart((CREATE_NODE_SET_EMPTY.equals(createNodeSetStr) ? "" : createNodeSetStr), ",", true);
+
+    if (createNodeList != null) {
+      nodeList = new ArrayList<>(createNodeList);
+      nodeList.retainAll(liveNodes);
+      if (message.getBool(CREATE_NODE_SET_SHUFFLE, CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
+        Collections.shuffle(nodeList, random);
+      }
+    } else {
+      nodeList = new ArrayList<>(liveNodes);
+      Collections.shuffle(nodeList, random);
+    }
+
+    return nodeList;
+  }
+
+  public static List<ReplicaPosition> identifyNodes(Supplier<CoreContainer> coreContainer,
+                                                    ZkStateReader zkStateReader,
+                                                    ClusterState clusterState,
+                                                    List<String> nodeList,
+                                                    String collectionName,
+                                                    ZkNodeProps message,
+                                                    List<String> shardNames,
+                                                    int numNrtReplicas,
+                                                    int numTlogReplicas,
+                                                    int numPullReplicas) throws KeeperException, InterruptedException {
+    List<Map> rulesMap = (List) message.get("rule");
+    String policyName = message.getStr(POLICY);
+    Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
+
+    if (rulesMap == null && policyName == null) {
+      int i = 0;
+      List<ReplicaPosition> result = new ArrayList<>();
+      for (String aShard : shardNames) {
+
+        for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+            Replica.Type.TLOG, numTlogReplicas,
+            Replica.Type.PULL, numPullReplicas
+        ).entrySet()) {
+          for (int j = 0; j < e.getValue(); j++){
+            result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size())));
+            i++;
+          }
+        }
+
+      }
+      return result;
+    } else {
+      if (numTlogReplicas + numPullReplicas != 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
+      }
+    }
+
+    if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
+      return getPositionsUsingPolicy(collectionName,
+          shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
+    } else {
+      List<Rule> rules = new ArrayList<>();
+      for (Object map : rulesMap) rules.add(new Rule((Map) map));
+      Map<String, Integer> sharVsReplicaCount = new HashMap<>();
+
+      for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
+      ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
+          sharVsReplicaCount,
+          (List<Map>) message.get(SNITCH),
+          new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
+          nodeList,
+          coreContainer.get(),
+          clusterState);
+
+      Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
+      return nodeMappings.entrySet().stream()
+          .map(e -> new ReplicaPosition(e.getKey().shard, e.getKey().index, e.getKey().type, e.getValue()))
+          .collect(Collectors.toList());
+    }
+  }
 
   static class ReplicaCount {
     public final String nodeName;
@@ -191,21 +287,21 @@ public class Assign {
     }
 
     List l = (List) coll.get(DocCollection.RULE);
-    Map<ReplicaAssigner.Position, String> positions = null;
+    List<ReplicaPosition> replicaPositions = null;
     if (l != null) {
-      positions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l);
+      replicaPositions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l);
     }
     String policyName = coll.getStr(POLICY);
     Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
     if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
-      positions= Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
+      replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
           policyName, cc.getZkController().getZkStateReader(), createNodeList);
     }
 
-    if(positions != null){
+    if(replicaPositions != null){
       List<ReplicaCount> repCounts = new ArrayList<>();
-      for (String s : positions.values()) {
-        repCounts.add(new ReplicaCount(s));
+      for (ReplicaPosition p : replicaPositions) {
+        repCounts.add(new ReplicaCount(p.node));
       }
       return repCounts;
     }
@@ -215,9 +311,10 @@ public class Assign {
     return sortedNodeList;
 
   }
-  public static Map<ReplicaAssigner.Position, String> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
-                                                                              String policyName, ZkStateReader zkStateReader,
-                                                                              List<String> nodesList) throws KeeperException, InterruptedException {
+
+  public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
+                                                              String policyName, ZkStateReader zkStateReader,
+                                                              List<String> nodesList) throws KeeperException, InterruptedException {
     try (CloudSolrClient csc = new CloudSolrClient.Builder()
         .withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
         .build()) {
@@ -226,11 +323,11 @@ public class Assign {
       Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
           autoScalingJson,
           clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas, nodesList);
-      Map<ReplicaAssigner.Position, String> result = new HashMap<>();
+      List<ReplicaPosition> result = new ArrayList<>();
       for (Map.Entry<String, List<String>> e : locations.entrySet()) {
         List<String> value = e.getValue();
         for (int i = 0; i < value.size(); i++) {
-          result.put(new ReplicaAssigner.Position(e.getKey(), i, Replica.Type.NRT), value.get(i));
+          result.add(new ReplicaPosition(e.getKey(), i, Replica.Type.NRT, value.get(i)));
         }
       }
       return result;
@@ -239,8 +336,8 @@ public class Assign {
     }
   }
 
-  private static Map<ReplicaAssigner.Position, String> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
-                                                                        CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) {
+  private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
+                                                        CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) {
     ArrayList<Rule> rules = new ArrayList<>();
     for (Object o : l) rules.add(new Rule((Map) o));
     Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>();
@@ -253,18 +350,18 @@ public class Assign {
         n.put(replica.getNodeName(), ++count);
       }
     }
-    List snitches = (List) coll.get(DocCollection.SNITCH);
+    List snitches = (List) coll.get(SNITCH);
     List<String> nodesList = createNodeList == null ?
         new ArrayList<>(clusterState.getLiveNodes()) :
         createNodeList;
-    Map<ReplicaAssigner.Position, String> positions = new ReplicaAssigner(
+    Map<ReplicaPosition, String> positions = new ReplicaAssigner(
         rules,
         Collections.singletonMap(shard, numberOfNodes),
         snitches,
         shardVsNodes,
         nodesList, cc, clusterState).getNodeMappings();
 
-    return positions;// getReplicaCounts(positions);
+    return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions);
   }
 
   private static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionName,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 0c87658..7d3df70 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -131,12 +131,12 @@ public class CreateCollectionCmd implements Cmd {
       // add our new cores to existing nodes serving the least number of cores
       // but (for now) require that each core goes on a distinct node.
 
-      final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
-      Map<ReplicaAssigner.Position, String> positionVsNodes;
+      final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
+      List<ReplicaPosition> replicaPositions;
       if (nodeList.isEmpty()) {
         log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
 
-        positionVsNodes = new HashMap<>();
+        replicaPositions = new ArrayList<>();
       } else {
         int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
         if (totalNumReplicas > nodeList.size()) {
@@ -164,7 +164,9 @@ public class CreateCollectionCmd implements Cmd {
               + " shards to be created (higher than the allowed number)");
         }
 
-        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+        replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
+            ocmh.zkStateReader
+            , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
       }
 
       ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -207,12 +209,11 @@ public class CreateCollectionCmd implements Cmd {
       log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}",
           collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas));
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
-      for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
-        ReplicaAssigner.Position position = e.getKey();
-        String nodeName = e.getValue();
-        String coreName = Assign.buildCoreName(collectionName, position.shard, position.type, position.index + 1);
+      for (ReplicaPosition replicaPosition : replicaPositions) {
+        String nodeName = replicaPosition.node;
+        String coreName = Assign.buildCoreName(collectionName, replicaPosition.shard, replicaPosition.type, replicaPosition.index + 1);
         log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
-            , coreName, position.shard, collectionName, nodeName));
+            , coreName, replicaPosition.shard, collectionName, nodeName));
 
 
         String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
@@ -222,11 +223,11 @@ public class CreateCollectionCmd implements Cmd {
           ZkNodeProps props = new ZkNodeProps(
               Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
               ZkStateReader.COLLECTION_PROP, collectionName,
-              ZkStateReader.SHARD_ID_PROP, position.shard,
+              ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
               ZkStateReader.CORE_NAME_PROP, coreName,
               ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, baseUrl, 
-              ZkStateReader.REPLICA_TYPE, position.type.name());
+              ZkStateReader.BASE_URL_PROP, baseUrl,
+              ZkStateReader.REPLICA_TYPE, replicaPosition.type.name());
           Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
         }
 
@@ -237,10 +238,10 @@ public class CreateCollectionCmd implements Cmd {
         params.set(CoreAdminParams.NAME, coreName);
         params.set(COLL_CONF, configName);
         params.set(CoreAdminParams.COLLECTION, collectionName);
-        params.set(CoreAdminParams.SHARD, position.shard);
+        params.set(CoreAdminParams.SHARD, replicaPosition.shard);
         params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
         params.set(CoreAdminParams.NEW_COLLECTION, "true");
-        params.set(CoreAdminParams.REPLICA_TYPE, position.type.name());
+        params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
 
         if (async != null) {
           String coreAdminAsyncId = async + Math.abs(System.nanoTime());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 6a18bff..fa493c7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -28,12 +28,13 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
 import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -108,7 +109,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
 
     // Get the Solr nodes to restore a collection.
-    final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(
+    final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
         zkStateReader.getClusterState().getLiveNodes(), message, RANDOM);
 
     int numShards = backupCollectionState.getActiveSlices().size();
@@ -213,8 +214,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     List<String> sliceNames = new ArrayList<>();
     restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
 
-    Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList,
-        restoreCollectionName, message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+    List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
+        ocmh.zkStateReader, clusterState,
+        nodeList, restoreCollectionName,
+        message, sliceNames,
+        numNrtReplicas, numTlogReplicas, numPullReplicas);
 
     //Create one replica per shard and copy backed up data to it
     for (Slice slice : restoreCollection.getSlices()) {
@@ -235,12 +239,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
       // Get the first node matching the shard to restore in
       String node;
-      for (Map.Entry<ReplicaAssigner.Position, String> pvn : positionVsNodes.entrySet()) {
-        ReplicaAssigner.Position position = pvn.getKey();
-        if (position.shard == slice.getName()) {
-          node = pvn.getValue();
+      for (ReplicaPosition replicaPosition : replicaPositions) {
+        if (Objects.equals(replicaPosition.shard, slice.getName())) {
+          node = replicaPosition.node;
           propMap.put(CoreAdminParams.NODE, node);
-          positionVsNodes.remove(position);
+          replicaPositions.remove(replicaPosition);
           break;
         }
       }
@@ -319,12 +322,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
           // Get the first node matching the shard to restore in
           String node;
-          for (Map.Entry<ReplicaAssigner.Position, String> pvn : positionVsNodes.entrySet()) {
-            ReplicaAssigner.Position position = pvn.getKey();
-            if (position.shard == slice.getName()) {
-              node = pvn.getValue();
+          for (ReplicaPosition replicaPosition : replicaPositions) {
+            if (Objects.equals(replicaPosition.shard, slice.getName())) {
+              node = replicaPosition.node;
               propMap.put(CoreAdminParams.NODE, node);
-              positionVsNodes.remove(position);
+              replicaPositions.remove(replicaPosition);
               break;
             }
           }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 2e2e335..099190c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -30,7 +30,7 @@ import java.util.Set;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CompositeIdRouter;
@@ -381,7 +381,8 @@ public class SplitShardCmd implements Cmd {
 
       // TODO: change this to handle sharding a slice into > 2 sub-shards.
 
-      Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
+      List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
+          ocmh.zkStateReader, clusterState,
           new ArrayList<>(clusterState.getLiveNodes()),
           collectionName,
           new ZkNodeProps(collection.getProperties()),
@@ -389,10 +390,10 @@ public class SplitShardCmd implements Cmd {
 
       List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 
-      for (Map.Entry<ReplicaAssigner.Position, String> entry : nodeMap.entrySet()) {
-        String sliceName = entry.getKey().shard;
-        String subShardNodeName = entry.getValue();
-        String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index);
+      for (ReplicaPosition replicaPosition : replicaPositions) {
+        String sliceName = replicaPosition.shard;
+        String subShardNodeName = replicaPosition.node;
+        String shardName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
 
         log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
             + collectionName + " on " + subShardNodeName);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
index 669e82b..8887e53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
+++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
@@ -59,31 +60,6 @@ public class ReplicaAssigner {
   private Map<String, AtomicInteger> nodeVsCores = new HashMap<>();
 
 
-  public static class Position implements Comparable<Position> {
-    public final String shard;
-    public final int index;
-    public final Replica.Type type;
-
-    public Position(String shard, int replicaIdx, Replica.Type type) {
-      this.shard = shard;
-      this.index = replicaIdx;
-      this.type = type;
-    }
-
-    @Override
-    public int compareTo(Position that) {
-      //this is to ensure that we try one replica from each shard first instead of
-      // all replicas from same shard
-      return that.index > index ? -1 : that.index == index ? 0 : 1;
-    }
-
-    @Override
-    public String toString() {
-      return shard + ":" + index;
-    }
-  }
-
-
   /**
    * @param shardVsReplicaCount shard names vs no:of replicas required for each of those shards
    * @param snitches            snitches details
@@ -128,8 +104,8 @@ public class ReplicaAssigner {
    * For each shard return a new set of nodes where the replicas need to be created satisfying
    * the specified rule
    */
-  public Map<Position, String> getNodeMappings() {
-    Map<Position, String> result = getNodeMappings0();
+  public Map<ReplicaPosition, String> getNodeMappings() {
+    Map<ReplicaPosition, String> result = getNodeMappings0();
     if (result == null) {
       String msg = "Could not identify nodes matching the rules " + rules;
       if (!failedNodes.isEmpty()) {
@@ -149,7 +125,7 @@ public class ReplicaAssigner {
 
   }
 
-  Map<Position, String> getNodeMappings0() {
+  Map<ReplicaPosition, String> getNodeMappings0() {
     List<String> shardNames = new ArrayList<>(shardVsReplicaCount.keySet());
     int[] shardOrder = new int[shardNames.size()];
     for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i;
@@ -168,17 +144,17 @@ public class ReplicaAssigner {
       }
     }
 
-    Map<Position, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false);
+    Map<ReplicaPosition, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false);
     if (result == null && hasFuzzyRules) {
       result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true);
     }
     return result;
   }
 
-  private Map<Position, String> tryAllPermutations(List<String> shardNames,
-                                                   int[] shardOrder,
-                                                   int nonWildCardShardRules,
-                                                   boolean fuzzyPhase) {
+  private Map<ReplicaPosition, String> tryAllPermutations(List<String> shardNames,
+                                                          int[] shardOrder,
+                                                          int nonWildCardShardRules,
+                                                          boolean fuzzyPhase) {
 
 
     Iterator<int[]> shardPermutations = nonWildCardShardRules > 0 ?
@@ -187,16 +163,16 @@ public class ReplicaAssigner {
 
     for (; shardPermutations.hasNext(); ) {
       int[] p = shardPermutations.next();
-      List<Position> positions = new ArrayList<>();
+      List<ReplicaPosition> replicaPositions = new ArrayList<>();
       for (int pos : p) {
         for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) {
-          positions.add(new Position(shardNames.get(pos), j, Replica.Type.NRT));
+          replicaPositions.add(new ReplicaPosition(shardNames.get(pos), j, Replica.Type.NRT));
         }
       }
-      Collections.sort(positions);
+      Collections.sort(replicaPositions);
       for (Iterator<int[]> it = permutations(rules.size()); it.hasNext(); ) {
         int[] permutation = it.next();
-        Map<Position, String> result = tryAPermutationOfRules(permutation, positions, fuzzyPhase);
+        Map<ReplicaPosition, String> result = tryAPermutationOfRules(permutation, replicaPositions, fuzzyPhase);
         if (result != null) return result;
       }
     }
@@ -205,9 +181,9 @@ public class ReplicaAssigner {
   }
 
 
-  private Map<Position, String> tryAPermutationOfRules(int[] rulePermutation, List<Position> positions, boolean fuzzyPhase) {
+  private Map<ReplicaPosition, String> tryAPermutationOfRules(int[] rulePermutation, List<ReplicaPosition> replicaPositions, boolean fuzzyPhase) {
     Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2);
-    Map<Position, String> result = new LinkedHashMap<>();
+    Map<ReplicaPosition, String> result = new LinkedHashMap<>();
     int startPosition = 0;
     Map<String, Map<String, Integer>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
     List<String> sortedLiveNodes = new ArrayList<>(this.participatingLiveNodes);
@@ -232,7 +208,7 @@ public class ReplicaAssigner {
       return result1;
     });
     forEachPosition:
-    for (Position position : positions) {
+    for (ReplicaPosition replicaPosition : replicaPositions) {
       //trying to assign a node by verifying each rule in this rulePermutation
       forEachNode:
       for (int j = 0; j < sortedLiveNodes.size(); j++) {
@@ -242,16 +218,16 @@ public class ReplicaAssigner {
           Rule rule = rules.get(rulePermutation[i]);
           //trying to assign a replica into this node in this shard
           Rule.MatchStatus status = rule.tryAssignNodeToShard(liveNode,
-              copyOfCurrentState, nodeVsTagsCopy, position.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN);
+              copyOfCurrentState, nodeVsTagsCopy, replicaPosition.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN);
           if (status == Rule.MatchStatus.CANNOT_ASSIGN_FAIL) {
             continue forEachNode;//try another node for this position
           }
         }
         //We have reached this far means this node can be applied to this position
         //and all rules are fine. So let us change the currentState
-        result.put(position, liveNode);
-        Map<String, Integer> nodeNames = copyOfCurrentState.get(position.shard);
-        if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashMap<>());
+        result.put(replicaPosition, liveNode);
+        Map<String, Integer> nodeNames = copyOfCurrentState.get(replicaPosition.shard);
+        if (nodeNames == null) copyOfCurrentState.put(replicaPosition.shard, nodeNames = new HashMap<>());
         Integer n = nodeNames.get(liveNode);
         n = n == null ? 1 : n + 1;
         nodeNames.put(liveNode, n);
@@ -267,11 +243,11 @@ public class ReplicaAssigner {
       return null;
     }
 
-    if (positions.size() > result.size()) {
+    if (replicaPositions.size() > result.size()) {
       return null;
     }
 
-    for (Map.Entry<Position, String> e : result.entrySet()) {
+    for (Map.Entry<ReplicaPosition, String> e : result.entrySet()) {
       for (int i = 0; i < rulePermutation.length; i++) {
         Rule rule = rules.get(rulePermutation[i]);
         Rule.MatchStatus matchStatus = rule.tryAssignNodeToShard(e.getValue(),

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java b/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java
new file mode 100644
index 0000000..d64d1d1
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+public class ReplicaPosition implements Comparable<ReplicaPosition> {
+  public final String shard;
+  public final int index;
+  public final Replica.Type type;
+  public String node;
+
+  public ReplicaPosition(String shard, int replicaIdx, Replica.Type type) {
+    this.shard = shard;
+    this.index = replicaIdx;
+    this.type = type;
+  }
+  public ReplicaPosition(String shard, int replicaIdx, Replica.Type type, String node) {
+    this.shard = shard;
+    this.index = replicaIdx;
+    this.type = type;
+    this.node = node;
+  }
+
+  @Override
+  public int compareTo(ReplicaPosition that) {
+    //this is to ensure that we try one replica from each shard first instead of
+    // all replicas from same shard
+    return that.index > index ? -1 : that.index == index ? 0 : 1;
+  }
+
+  @Override
+  public String toString() {
+    return shard + ":" + index;
+  }
+
+  public ReplicaPosition setNode(String node) {
+    this.node = node;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/196d84b9/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
index 8b0a788..6d460ed 100644
--- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
@@ -28,7 +28,7 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
+import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.rule.Snitch;
 import org.apache.solr.common.cloud.rule.SnitchContext;
@@ -73,7 +73,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
             "'replica':'1',shard:'*','node':'*'}," +
             " {'freedisk':'>1'}]");
 
-    Map<Position, String> mapping = new ReplicaAssigner(
+    Map<ReplicaPosition, String> mapping = new ReplicaAssigner(
         rules,
         shardVsReplicaCount, singletonList(MockSnitch.class.getName()),
         new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings();
@@ -147,7 +147,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
             "{node:'!127.0.0.1:49947_'}," +
             "{freedisk:'>1'}]");
     Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
-    Map<Position, String> mapping = new ReplicaAssigner(
+    Map<ReplicaPosition, String> mapping = new ReplicaAssigner(
         rules,
         shardVsReplicaCount, singletonList(MockSnitch.class.getName()),
         new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
@@ -236,7 +236,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
         "node5:80", makeMap("rack", "182")
     );
     MockSnitch.nodeVsTags = nodeVsTags;
-    Map<Position, String> mapping = new ReplicaAssigner(
+    Map<ReplicaPosition, String> mapping = new ReplicaAssigner(
         rules,
         shardVsReplicaCount, singletonList(MockSnitch.class.getName()),
         new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();