You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2017/06/30 06:40:03 UTC
lucene-solr:feature/autoscaling: SOLR-10954: Refactor code to
standardize replica assignment
Repository: lucene-solr
Updated Branches:
refs/heads/feature/autoscaling 492eebc89 -> 0093015c5
SOLR-10954: Refactor code to standardize replica assignment
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0093015c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0093015c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0093015c
Branch: refs/heads/feature/autoscaling
Commit: 0093015c5451193f1e8bc32ffb1d5cd2f1a497ef
Parents: 492eebc
Author: Noble Paul <no...@apache.org>
Authored: Fri Jun 30 16:09:52 2017 +0930
Committer: Noble Paul <no...@apache.org>
Committed: Fri Jun 30 16:09:52 2017 +0930
----------------------------------------------------------------------
.../src/java/org/apache/solr/cloud/Assign.java | 131 ++++++++++++++++---
.../apache/solr/cloud/CreateCollectionCmd.java | 31 ++---
.../cloud/OverseerCollectionMessageHandler.java | 90 -------------
.../java/org/apache/solr/cloud/RestoreCmd.java | 30 +++--
.../org/apache/solr/cloud/SplitShardCmd.java | 13 +-
.../apache/solr/cloud/rule/ReplicaAssigner.java | 68 ++++------
.../solr/common/cloud/ReplicaPosition.java | 55 ++++++++
.../apache/solr/cloud/rule/RuleEngineTest.java | 8 +-
8 files changed, 234 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index 9f21245..7c9752d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -25,10 +25,14 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -40,7 +44,9 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
@@ -49,6 +55,11 @@ import org.apache.zookeeper.KeeperException;
import static java.util.Collections.singletonMap;
import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
@@ -119,7 +130,7 @@ public class Assign {
returnShardId = shardIdNames.get(0);
return returnShardId;
}
-
+
public static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
// TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
@@ -141,6 +152,91 @@ public class Assign {
else return replicaName;
}
}
+ public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
+ // TODO: add smarter options that look at the current number of cores per
+ // node?
+ // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
+
+ List<String> nodeList;
+
+ final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
+ final List<String> createNodeList = (createNodeSetStr == null) ? null : StrUtils.splitSmart((CREATE_NODE_SET_EMPTY.equals(createNodeSetStr) ? "" : createNodeSetStr), ",", true);
+
+ if (createNodeList != null) {
+ nodeList = new ArrayList<>(createNodeList);
+ nodeList.retainAll(liveNodes);
+ if (message.getBool(CREATE_NODE_SET_SHUFFLE, CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
+ Collections.shuffle(nodeList, random);
+ }
+ } else {
+ nodeList = new ArrayList<>(liveNodes);
+ Collections.shuffle(nodeList, random);
+ }
+
+ return nodeList;
+ }
+
+ public static List<ReplicaPosition> identifyNodes(Supplier<CoreContainer> coreContainer,
+ ZkStateReader zkStateReader,
+ ClusterState clusterState,
+ List<String> nodeList,
+ String collectionName,
+ ZkNodeProps message,
+ List<String> shardNames,
+ int numNrtReplicas,
+ int numTlogReplicas,
+ int numPullReplicas) throws KeeperException, InterruptedException {
+ List<Map> rulesMap = (List) message.get("rule");
+ String policyName = message.getStr(POLICY);
+ Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
+
+ if (rulesMap == null && policyName == null) {
+ int i = 0;
+ List<ReplicaPosition> result = new ArrayList<>();
+ for (String aShard : shardNames) {
+
+ for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+ Replica.Type.TLOG, numTlogReplicas,
+ Replica.Type.PULL, numPullReplicas
+ ).entrySet()) {
+ for (int j = 0; j < e.getValue(); j++){
+ result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size())));
+ i++;
+ }
+ }
+
+ }
+ return result;
+ } else {
+ if (numTlogReplicas + numPullReplicas != 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
+ }
+ }
+
+ if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
+ return getPositionsUsingPolicy(collectionName,
+ shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
+ } else {
+ List<Rule> rules = new ArrayList<>();
+ for (Object map : rulesMap) rules.add(new Rule((Map) map));
+ Map<String, Integer> sharVsReplicaCount = new HashMap<>();
+
+ for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
+ ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
+ sharVsReplicaCount,
+ (List<Map>) message.get(SNITCH),
+ new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
+ nodeList,
+ coreContainer.get(),
+ clusterState);
+
+ Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
+ return nodeMappings.entrySet().stream()
+ .map(e -> new ReplicaPosition(e.getKey().shard, e.getKey().index, e.getKey().type, e.getValue()))
+ .collect(Collectors.toList());
+ }
+ }
static class ReplicaCount {
public final String nodeName;
@@ -191,21 +287,21 @@ public class Assign {
}
List l = (List) coll.get(DocCollection.RULE);
- Map<ReplicaAssigner.Position, String> positions = null;
+ List<ReplicaPosition> replicaPositions = null;
if (l != null) {
- positions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l);
+ replicaPositions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l);
}
String policyName = coll.getStr(POLICY);
Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
- positions= Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
+ replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
policyName, cc.getZkController().getZkStateReader(), createNodeList);
}
- if(positions != null){
+ if(replicaPositions != null){
List<ReplicaCount> repCounts = new ArrayList<>();
- for (String s : positions.values()) {
- repCounts.add(new ReplicaCount(s));
+ for (ReplicaPosition p : replicaPositions) {
+ repCounts.add(new ReplicaCount(p.node));
}
return repCounts;
}
@@ -215,9 +311,10 @@ public class Assign {
return sortedNodeList;
}
- public static Map<ReplicaAssigner.Position, String> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
- String policyName, ZkStateReader zkStateReader,
- List<String> nodesList) throws KeeperException, InterruptedException {
+
+ public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
+ String policyName, ZkStateReader zkStateReader,
+ List<String> nodesList) throws KeeperException, InterruptedException {
try (CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
@@ -226,11 +323,11 @@ public class Assign {
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
autoScalingJson,
clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas, nodesList);
- Map<ReplicaAssigner.Position, String> result = new HashMap<>();
+ List<ReplicaPosition> result = new ArrayList<>();
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
List<String> value = e.getValue();
for (int i = 0; i < value.size(); i++) {
- result.put(new ReplicaAssigner.Position(e.getKey(), i, Replica.Type.NRT), value.get(i));
+ result.add(new ReplicaPosition(e.getKey(), i, Replica.Type.NRT, value.get(i)));
}
}
return result;
@@ -239,8 +336,8 @@ public class Assign {
}
}
- private static Map<ReplicaAssigner.Position, String> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
- CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) {
+ private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
+ CoreContainer cc, DocCollection coll, List<String> createNodeList, List l) {
ArrayList<Rule> rules = new ArrayList<>();
for (Object o : l) rules.add(new Rule((Map) o));
Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>();
@@ -253,18 +350,18 @@ public class Assign {
n.put(replica.getNodeName(), ++count);
}
}
- List snitches = (List) coll.get(DocCollection.SNITCH);
+ List snitches = (List) coll.get(SNITCH);
List<String> nodesList = createNodeList == null ?
new ArrayList<>(clusterState.getLiveNodes()) :
createNodeList;
- Map<ReplicaAssigner.Position, String> positions = new ReplicaAssigner(
+ Map<ReplicaPosition, String> positions = new ReplicaAssigner(
rules,
Collections.singletonMap(shard, numberOfNodes),
snitches,
shardVsNodes,
nodesList, cc, clusterState).getNodeMappings();
- return positions;// getReplicaCounts(positions);
+ return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions);
}
private static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionName,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index ee5cc47..df23796 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -33,7 +33,7 @@ import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.autoscaling.AutoScaling;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -138,12 +138,12 @@ public class CreateCollectionCmd implements Cmd {
// add our new cores to existing nodes serving the least number of cores
// but (for now) require that each core goes on a distinct node.
- final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
- Map<ReplicaAssigner.Position, String> positionVsNodes;
+ final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
+ List<ReplicaPosition> replicaPositions;
if (nodeList.isEmpty()) {
log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
- positionVsNodes = new HashMap<>();
+ replicaPositions = new ArrayList<>();
} else {
int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
if (totalNumReplicas > nodeList.size()) {
@@ -171,7 +171,9 @@ public class CreateCollectionCmd implements Cmd {
+ " shards to be created (higher than the allowed number)");
}
- positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+ replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
+ ocmh.zkStateReader
+ , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
}
ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -214,12 +216,11 @@ public class CreateCollectionCmd implements Cmd {
log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}",
collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas));
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
- for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
- ReplicaAssigner.Position position = e.getKey();
- String nodeName = e.getValue();
- String coreName = Assign.buildCoreName(collectionName, position.shard, position.type, position.index + 1);
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ String nodeName = replicaPosition.node;
+ String coreName = Assign.buildCoreName(collectionName, replicaPosition.shard, replicaPosition.type, replicaPosition.index + 1);
log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
- , coreName, position.shard, collectionName, nodeName));
+ , coreName, replicaPosition.shard, collectionName, nodeName));
String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
@@ -229,11 +230,11 @@ public class CreateCollectionCmd implements Cmd {
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP, collectionName,
- ZkStateReader.SHARD_ID_PROP, position.shard,
+ ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
- ZkStateReader.BASE_URL_PROP, baseUrl,
- ZkStateReader.REPLICA_TYPE, position.type.name());
+ ZkStateReader.BASE_URL_PROP, baseUrl,
+ ZkStateReader.REPLICA_TYPE, replicaPosition.type.name());
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
}
@@ -244,10 +245,10 @@ public class CreateCollectionCmd implements Cmd {
params.set(CoreAdminParams.NAME, coreName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
- params.set(CoreAdminParams.SHARD, position.shard);
+ params.set(CoreAdminParams.SHARD, replicaPosition.shard);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
params.set(CoreAdminParams.NEW_COLLECTION, "true");
- params.set(CoreAdminParams.REPLICA_TYPE, position.type.name());
+ params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
if (async != null) {
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 3ee1fde..c392fab 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -37,7 +36,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -47,9 +45,6 @@ import org.apache.solr.cloud.autoscaling.AutoScaling;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
-import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
-import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -102,7 +97,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@@ -641,30 +635,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
}
- static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
- // TODO: add smarter options that look at the current number of cores per
- // node?
- // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
-
- List<String> nodeList;
-
- final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
- final List<String> createNodeList = (createNodeSetStr == null)?null:StrUtils.splitSmart((CREATE_NODE_SET_EMPTY.equals(createNodeSetStr)?"":createNodeSetStr), ",", true);
-
- if (createNodeList != null) {
- nodeList = new ArrayList<>(createNodeList);
- nodeList.retainAll(liveNodes);
- if (message.getBool(CREATE_NODE_SET_SHUFFLE, CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
- Collections.shuffle(nodeList, random);
- }
- } else {
- nodeList = new ArrayList<>(liveNodes);
- Collections.shuffle(nodeList, random);
- }
-
- return nodeList;
- }
-
private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
@@ -718,66 +688,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
}
- Map<Position, String> identifyNodes(ClusterState clusterState,
- List<String> nodeList,
- String collectionName,
- ZkNodeProps message,
- List<String> shardNames,
- int numNrtReplicas,
- int numTlogReplicas,
- int numPullReplicas) throws KeeperException, InterruptedException {
- List<Map> rulesMap = (List) message.get("rule");
- String policyName = message.getStr(POLICY);
- Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
-
- if (rulesMap == null && policyName == null) {
- int i = 0;
- Map<Position, String> result = new HashMap<>();
- for (String aShard : shardNames) {
- for (int j = 0; j < numNrtReplicas; j++){
- result.put(new Position(aShard, j, Replica.Type.NRT), nodeList.get(i % nodeList.size()));
- i++;
- }
- for (int j = 0; j < numTlogReplicas; j++){
- result.put(new Position(aShard, j, Replica.Type.TLOG), nodeList.get(i % nodeList.size()));
- i++;
- }
- for (int j = 0; j < numPullReplicas; j++){
- result.put(new Position(aShard, j, Replica.Type.PULL), nodeList.get(i % nodeList.size()));
- i++;
- }
- }
- return result;
- } else {
- if (numTlogReplicas + numPullReplicas != 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
- }
- }
-
- if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
- return Assign.getPositionsUsingPolicy(collectionName,
- shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
-
- } else {
- List<Rule> rules = new ArrayList<>();
- for (Object map : rulesMap) rules.add(new Rule((Map) map));
-
- Map<String, Integer> sharVsReplicaCount = new HashMap<>();
-
- for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
- ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
- sharVsReplicaCount,
- (List<Map>) message.get(SNITCH),
- new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
- nodeList,
- overseer.getZkController().getCoreContainer(),
- clusterState);
-
- return replicaAssigner.getNodeMappings();
- }
- }
-
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
Map<String, Replica> result = new HashMap<>();
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 6a18bff..fa493c7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -28,12 +28,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -108,7 +109,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
// Get the Solr nodes to restore a collection.
- final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(
+ final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
zkStateReader.getClusterState().getLiveNodes(), message, RANDOM);
int numShards = backupCollectionState.getActiveSlices().size();
@@ -213,8 +214,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
List<String> sliceNames = new ArrayList<>();
restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
- Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList,
- restoreCollectionName, message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+ List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
+ ocmh.zkStateReader, clusterState,
+ nodeList, restoreCollectionName,
+ message, sliceNames,
+ numNrtReplicas, numTlogReplicas, numPullReplicas);
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
@@ -235,12 +239,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
// Get the first node matching the shard to restore in
String node;
- for (Map.Entry<ReplicaAssigner.Position, String> pvn : positionVsNodes.entrySet()) {
- ReplicaAssigner.Position position = pvn.getKey();
- if (position.shard == slice.getName()) {
- node = pvn.getValue();
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ if (Objects.equals(replicaPosition.shard, slice.getName())) {
+ node = replicaPosition.node;
propMap.put(CoreAdminParams.NODE, node);
- positionVsNodes.remove(position);
+ replicaPositions.remove(replicaPosition);
break;
}
}
@@ -319,12 +322,11 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
// Get the first node matching the shard to restore in
String node;
- for (Map.Entry<ReplicaAssigner.Position, String> pvn : positionVsNodes.entrySet()) {
- ReplicaAssigner.Position position = pvn.getKey();
- if (position.shard == slice.getName()) {
- node = pvn.getValue();
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ if (Objects.equals(replicaPosition.shard, slice.getName())) {
+ node = replicaPosition.node;
propMap.put(CoreAdminParams.NODE, node);
- positionVsNodes.remove(position);
+ replicaPositions.remove(replicaPosition);
break;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 2e2e335..099190c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -30,7 +30,7 @@ import java.util.Set;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
@@ -381,7 +381,8 @@ public class SplitShardCmd implements Cmd {
// TODO: change this to handle sharding a slice into > 2 sub-shards.
- Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
+ List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
+ ocmh.zkStateReader, clusterState,
new ArrayList<>(clusterState.getLiveNodes()),
collectionName,
new ZkNodeProps(collection.getProperties()),
@@ -389,10 +390,10 @@ public class SplitShardCmd implements Cmd {
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
- for (Map.Entry<ReplicaAssigner.Position, String> entry : nodeMap.entrySet()) {
- String sliceName = entry.getKey().shard;
- String subShardNodeName = entry.getValue();
- String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index);
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ String sliceName = replicaPosition.shard;
+ String subShardNodeName = replicaPosition.node;
+ String shardName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
+ collectionName + " on " + subShardNodeName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
index 669e82b..8887e53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
+++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
@@ -59,31 +60,6 @@ public class ReplicaAssigner {
private Map<String, AtomicInteger> nodeVsCores = new HashMap<>();
- public static class Position implements Comparable<Position> {
- public final String shard;
- public final int index;
- public final Replica.Type type;
-
- public Position(String shard, int replicaIdx, Replica.Type type) {
- this.shard = shard;
- this.index = replicaIdx;
- this.type = type;
- }
-
- @Override
- public int compareTo(Position that) {
- //this is to ensure that we try one replica from each shard first instead of
- // all replicas from same shard
- return that.index > index ? -1 : that.index == index ? 0 : 1;
- }
-
- @Override
- public String toString() {
- return shard + ":" + index;
- }
- }
-
-
/**
* @param shardVsReplicaCount shard names vs no:of replicas required for each of those shards
* @param snitches snitches details
@@ -128,8 +104,8 @@ public class ReplicaAssigner {
* For each shard return a new set of nodes where the replicas need to be created satisfying
* the specified rule
*/
- public Map<Position, String> getNodeMappings() {
- Map<Position, String> result = getNodeMappings0();
+ public Map<ReplicaPosition, String> getNodeMappings() {
+ Map<ReplicaPosition, String> result = getNodeMappings0();
if (result == null) {
String msg = "Could not identify nodes matching the rules " + rules;
if (!failedNodes.isEmpty()) {
@@ -149,7 +125,7 @@ public class ReplicaAssigner {
}
- Map<Position, String> getNodeMappings0() {
+ Map<ReplicaPosition, String> getNodeMappings0() {
List<String> shardNames = new ArrayList<>(shardVsReplicaCount.keySet());
int[] shardOrder = new int[shardNames.size()];
for (int i = 0; i < shardNames.size(); i++) shardOrder[i] = i;
@@ -168,17 +144,17 @@ public class ReplicaAssigner {
}
}
- Map<Position, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false);
+ Map<ReplicaPosition, String> result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, false);
if (result == null && hasFuzzyRules) {
result = tryAllPermutations(shardNames, shardOrder, nonWildCardShardRules, true);
}
return result;
}
- private Map<Position, String> tryAllPermutations(List<String> shardNames,
- int[] shardOrder,
- int nonWildCardShardRules,
- boolean fuzzyPhase) {
+ private Map<ReplicaPosition, String> tryAllPermutations(List<String> shardNames,
+ int[] shardOrder,
+ int nonWildCardShardRules,
+ boolean fuzzyPhase) {
Iterator<int[]> shardPermutations = nonWildCardShardRules > 0 ?
@@ -187,16 +163,16 @@ public class ReplicaAssigner {
for (; shardPermutations.hasNext(); ) {
int[] p = shardPermutations.next();
- List<Position> positions = new ArrayList<>();
+ List<ReplicaPosition> replicaPositions = new ArrayList<>();
for (int pos : p) {
for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) {
- positions.add(new Position(shardNames.get(pos), j, Replica.Type.NRT));
+ replicaPositions.add(new ReplicaPosition(shardNames.get(pos), j, Replica.Type.NRT));
}
}
- Collections.sort(positions);
+ Collections.sort(replicaPositions);
for (Iterator<int[]> it = permutations(rules.size()); it.hasNext(); ) {
int[] permutation = it.next();
- Map<Position, String> result = tryAPermutationOfRules(permutation, positions, fuzzyPhase);
+ Map<ReplicaPosition, String> result = tryAPermutationOfRules(permutation, replicaPositions, fuzzyPhase);
if (result != null) return result;
}
}
@@ -205,9 +181,9 @@ public class ReplicaAssigner {
}
- private Map<Position, String> tryAPermutationOfRules(int[] rulePermutation, List<Position> positions, boolean fuzzyPhase) {
+ private Map<ReplicaPosition, String> tryAPermutationOfRules(int[] rulePermutation, List<ReplicaPosition> replicaPositions, boolean fuzzyPhase) {
Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2);
- Map<Position, String> result = new LinkedHashMap<>();
+ Map<ReplicaPosition, String> result = new LinkedHashMap<>();
int startPosition = 0;
Map<String, Map<String, Integer>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
List<String> sortedLiveNodes = new ArrayList<>(this.participatingLiveNodes);
@@ -232,7 +208,7 @@ public class ReplicaAssigner {
return result1;
});
forEachPosition:
- for (Position position : positions) {
+ for (ReplicaPosition replicaPosition : replicaPositions) {
//trying to assign a node by verifying each rule in this rulePermutation
forEachNode:
for (int j = 0; j < sortedLiveNodes.size(); j++) {
@@ -242,16 +218,16 @@ public class ReplicaAssigner {
Rule rule = rules.get(rulePermutation[i]);
//trying to assign a replica into this node in this shard
Rule.MatchStatus status = rule.tryAssignNodeToShard(liveNode,
- copyOfCurrentState, nodeVsTagsCopy, position.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN);
+ copyOfCurrentState, nodeVsTagsCopy, replicaPosition.shard, fuzzyPhase ? FUZZY_ASSIGN : ASSIGN);
if (status == Rule.MatchStatus.CANNOT_ASSIGN_FAIL) {
continue forEachNode;//try another node for this position
}
}
//We have reached this far means this node can be applied to this position
//and all rules are fine. So let us change the currentState
- result.put(position, liveNode);
- Map<String, Integer> nodeNames = copyOfCurrentState.get(position.shard);
- if (nodeNames == null) copyOfCurrentState.put(position.shard, nodeNames = new HashMap<>());
+ result.put(replicaPosition, liveNode);
+ Map<String, Integer> nodeNames = copyOfCurrentState.get(replicaPosition.shard);
+ if (nodeNames == null) copyOfCurrentState.put(replicaPosition.shard, nodeNames = new HashMap<>());
Integer n = nodeNames.get(liveNode);
n = n == null ? 1 : n + 1;
nodeNames.put(liveNode, n);
@@ -267,11 +243,11 @@ public class ReplicaAssigner {
return null;
}
- if (positions.size() > result.size()) {
+ if (replicaPositions.size() > result.size()) {
return null;
}
- for (Map.Entry<Position, String> e : result.entrySet()) {
+ for (Map.Entry<ReplicaPosition, String> e : result.entrySet()) {
for (int i = 0; i < rulePermutation.length; i++) {
Rule rule = rules.get(rulePermutation[i]);
Rule.MatchStatus matchStatus = rule.tryAssignNodeToShard(e.getValue(),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java b/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java
new file mode 100644
index 0000000..d64d1d1
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/common/cloud/ReplicaPosition.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+public class ReplicaPosition implements Comparable<ReplicaPosition> {
+ public final String shard;
+ public final int index;
+ public final Replica.Type type;
+ public String node;
+
+ public ReplicaPosition(String shard, int replicaIdx, Replica.Type type) {
+ this.shard = shard;
+ this.index = replicaIdx;
+ this.type = type;
+ }
+ public ReplicaPosition(String shard, int replicaIdx, Replica.Type type, String node) {
+ this.shard = shard;
+ this.index = replicaIdx;
+ this.type = type;
+ this.node = node;
+ }
+
+ @Override
+ public int compareTo(ReplicaPosition that) {
+ //this is to ensure that we try one replica from each shard first instead of
+ // all replicas from same shard
+ return that.index > index ? -1 : that.index == index ? 0 : 1;
+ }
+
+ @Override
+ public String toString() {
+ return shard + ":" + index;
+ }
+
+ public ReplicaPosition setNode(String node) {
+ this.node = node;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0093015c/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
index 8b0a788..6d460ed 100644
--- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
@@ -28,7 +28,7 @@ import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
+import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.Snitch;
import org.apache.solr.common.cloud.rule.SnitchContext;
@@ -73,7 +73,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
"'replica':'1',shard:'*','node':'*'}," +
" {'freedisk':'>1'}]");
- Map<Position, String> mapping = new ReplicaAssigner(
+ Map<ReplicaPosition, String> mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null ).getNodeMappings();
@@ -147,7 +147,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
"{node:'!127.0.0.1:49947_'}," +
"{freedisk:'>1'}]");
Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
- Map<Position, String> mapping = new ReplicaAssigner(
+ Map<ReplicaPosition, String> mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings();
@@ -236,7 +236,7 @@ public class RuleEngineTest extends SolrTestCaseJ4{
"node5:80", makeMap("rack", "182")
);
MockSnitch.nodeVsTags = nodeVsTags;
- Map<Position, String> mapping = new ReplicaAssigner(
+ Map<ReplicaPosition, String> mapping = new ReplicaAssigner(
rules,
shardVsReplicaCount, singletonList(MockSnitch.class.getName()),
new HashMap(), new ArrayList<>(MockSnitch.nodeVsTags.keySet()), null, null).getNodeMappings0();