You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 21:31:19 UTC
[09/11] lucene-solr:branch_7x: SOLR-11285: Simulation framework for
autoscaling.
SOLR-11285: Simulation framework for autoscaling.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b1ce5e20
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b1ce5e20
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b1ce5e20
Branch: refs/heads/branch_7x
Commit: b1ce5e2085ad6dda1a5adf5d30c4822d0fe4ce59
Parents: 99f089b
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Dec 14 12:13:05 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Dec 14 22:30:49 2017 +0100
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +
.../org/apache/solr/cloud/ActionThrottle.java | 14 +-
.../org/apache/solr/cloud/AddReplicaCmd.java | 149 +-
.../src/java/org/apache/solr/cloud/Assign.java | 24 +-
.../solr/cloud/CloudConfigSetService.java | 2 +-
.../java/org/apache/solr/cloud/CloudUtil.java | 7 +
.../apache/solr/cloud/CreateCollectionCmd.java | 244 ++--
.../org/apache/solr/cloud/CreateShardCmd.java | 102 +-
.../apache/solr/cloud/DeleteCollectionCmd.java | 9 +-
.../org/apache/solr/cloud/DeleteShardCmd.java | 9 +-
.../java/org/apache/solr/cloud/MigrateCmd.java | 9 +-
.../org/apache/solr/cloud/MoveReplicaCmd.java | 9 +-
.../java/org/apache/solr/cloud/Overseer.java | 16 +-
.../cloud/OverseerCollectionMessageHandler.java | 46 +-
.../java/org/apache/solr/cloud/RestoreCmd.java | 2 +-
.../org/apache/solr/cloud/SplitShardCmd.java | 248 ++--
.../solr/cloud/autoscaling/AutoScaling.java | 14 +-
.../cloud/autoscaling/AutoScalingHandler.java | 70 +-
.../cloud/autoscaling/ComputePlanAction.java | 24 +-
.../cloud/autoscaling/ExecutePlanAction.java | 38 +-
.../cloud/autoscaling/HttpTriggerListener.java | 6 +-
.../cloud/autoscaling/NodeAddedTrigger.java | 15 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 14 +-
.../autoscaling/OverseerTriggerThread.java | 20 +-
.../cloud/autoscaling/ScheduledTriggers.java | 53 +-
.../cloud/autoscaling/SearchRateTrigger.java | 10 +-
.../cloud/autoscaling/SystemLogListener.java | 6 +-
.../cloud/autoscaling/TriggerEventQueue.java | 4 +-
.../cloud/autoscaling/TriggerListenerBase.java | 6 +-
.../solr/cloud/overseer/CollectionMutator.java | 8 +-
.../solr/cloud/overseer/ReplicaMutator.java | 2 +-
.../solr/cloud/overseer/SliceMutator.java | 2 +-
.../org/apache/solr/core/CoreContainer.java | 6 +-
.../solr/handler/CdcrReplicatorManager.java | 11 +-
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../org/apache/solr/schema/SchemaManager.java | 3 +-
.../org/apache/solr/servlet/HttpSolrCall.java | 5 +-
.../processor/DistributedUpdateProcessor.java | 3 +-
.../src/java/org/apache/solr/util/IdUtils.java | 1 +
.../src/java/org/apache/solr/util/TimeOut.java | 18 +-
.../java/org/apache/solr/util/TimeSource.java | 57 -
.../solr/util/xslt/TransformerProvider.java | 3 +-
.../apache/solr/cloud/ActionThrottleTest.java | 12 +-
.../org/apache/solr/cloud/AddReplicaTest.java | 2 +-
.../test/org/apache/solr/cloud/AssignTest.java | 10 +-
...MonkeyNothingIsSafeWithPullReplicasTest.java | 3 +-
...aosMonkeySafeLeaderWithPullReplicasTest.java | 3 +-
.../cloud/CollectionsAPIDistributedZkTest.java | 5 +-
...ConcurrentDeleteAndCreateCollectionTest.java | 3 +-
...DistribDocExpirationUpdateProcessorTest.java | 3 +-
.../cloud/LeaderFailureAfterFreshStartTest.java | 3 +-
.../apache/solr/cloud/MigrateRouteKeyTest.java | 3 +-
...verseerCollectionConfigSetProcessorTest.java | 9 +-
.../apache/solr/cloud/OverseerRolesTest.java | 3 +-
.../solr/cloud/PeerSyncReplicationTest.java | 3 +-
.../cloud/SharedFSAutoReplicaFailoverTest.java | 3 +-
.../TestLeaderInitiatedRecoveryThread.java | 7 +-
.../org/apache/solr/cloud/TestPullReplica.java | 7 +-
.../cloud/TestPullReplicaErrorHandling.java | 9 +-
.../apache/solr/cloud/TestRebalanceLeaders.java | 5 +-
.../org/apache/solr/cloud/TestTlogReplica.java | 11 +-
.../solr/cloud/UnloadDistributedZkTest.java | 3 +-
.../AutoAddReplicasIntegrationTest.java | 3 +-
.../AutoAddReplicasPlanActionTest.java | 4 +-
.../autoscaling/AutoScalingHandlerTest.java | 3 +-
.../solr/cloud/autoscaling/CapturedEvent.java | 63 +
.../autoscaling/ComputePlanActionTest.java | 16 +
.../autoscaling/ExecutePlanActionTest.java | 2 +-
.../cloud/autoscaling/NodeAddedTriggerTest.java | 2 +-
.../cloud/autoscaling/NodeLostTriggerTest.java | 2 +-
.../autoscaling/TriggerIntegrationTest.java | 155 +--
.../solr/cloud/autoscaling/sim/ActionError.java | 24 +
.../sim/GenericDistributedQueue.java | 599 ++++++++
.../sim/GenericDistributedQueueFactory.java | 45 +
.../cloud/autoscaling/sim/LiveNodesSet.java | 99 ++
.../cloud/autoscaling/sim/SimCloudManager.java | 607 +++++++++
.../sim/SimClusterStateProvider.java | 1275 ++++++++++++++++++
.../autoscaling/sim/SimDistribStateManager.java | 580 ++++++++
.../sim/SimDistributedQueueFactory.java | 284 ++++
.../autoscaling/sim/SimNodeStateProvider.java | 267 ++++
.../autoscaling/sim/SimSolrCloudTestCase.java | 251 ++++
.../sim/TestClusterStateProvider.java | 221 +++
.../autoscaling/sim/TestComputePlanAction.java | 357 +++++
.../sim/TestDistribStateManager.java | 284 ++++
.../autoscaling/sim/TestExecutePlanAction.java | 216 +++
.../sim/TestGenericDistributedQueue.java | 32 +
.../cloud/autoscaling/sim/TestLargeCluster.java | 266 ++++
.../autoscaling/sim/TestNodeAddedTrigger.java | 306 +++++
.../autoscaling/sim/TestNodeLostTrigger.java | 331 +++++
.../cloud/autoscaling/sim/TestPolicyCloud.java | 357 +++++
.../sim/TestSimDistributedQueue.java | 220 +++
.../autoscaling/sim/TestTriggerIntegration.java | 1217 +++++++++++++++++
.../cloud/autoscaling/sim/package-info.java | 21 +
.../cloud/cdcr/BaseCdcrDistributedZkTest.java | 3 +-
.../apache/solr/cloud/hdfs/StressHdfsTest.java | 3 +-
.../solr/cloud/overseer/ZkStateReaderTest.java | 3 +-
.../apache/solr/cloud/rule/RuleEngineTest.java | 3 +
.../solr/core/OpenCloseCoreStressTest.java | 7 +-
.../cloud/autoscaling/AutoScalingConfig.java | 9 +
.../autoscaling/DelegatingCloudManager.java | 17 +
.../DelegatingDistribStateManager.java | 17 +-
.../DelegatingNodeStateProvider.java | 11 +
.../cloud/autoscaling/DistribStateManager.java | 23 +-
.../cloud/autoscaling/NodeStateProvider.java | 4 +-
.../client/solrj/cloud/autoscaling/Policy.java | 21 +-
.../solrj/cloud/autoscaling/PolicyHelper.java | 26 +-
.../solrj/cloud/autoscaling/ReplicaInfo.java | 33 +-
.../cloud/autoscaling/SolrCloudManager.java | 10 +-
.../solrj/cloud/autoscaling/Suggestion.java | 2 +-
.../solr/client/solrj/impl/CloudSolrClient.java | 5 +-
.../client/solrj/impl/ClusterStateProvider.java | 4 +-
.../solrj/impl/HttpClusterStateProvider.java | 21 +-
.../solrj/impl/SolrClientCloudManager.java | 6 +
.../solrj/impl/SolrClientNodeStateProvider.java | 6 +
.../solrj/impl/ZkDistribStateManager.java | 32 +-
.../apache/solr/common/cloud/ClusterState.java | 10 +
.../org/apache/solr/common/cloud/Replica.java | 11 +
.../apache/solr/common/cloud/ZkStateReader.java | 19 +-
.../org/apache/solr/common/util/TimeSource.java | 161 +++
.../java/org/apache/solr/common/util/Utils.java | 23 +-
.../solr/client/solrj/SolrExampleTestsBase.java | 3 +-
.../solr/client/solrj/TestLBHttpSolrClient.java | 3 +-
.../solrj/cloud/autoscaling/TestPolicy.java | 12 +
.../cloud/AbstractFullDistribZkTestBase.java | 7 +-
.../java/org/apache/solr/cloud/ChaosMonkey.java | 3 +-
.../org/apache/solr/cloud/ZkTestServer.java | 3 +-
126 files changed, 9186 insertions(+), 846 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2949945..d59de38 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -29,6 +29,10 @@ Apache UIMA 2.3.1
Apache ZooKeeper 3.4.10
Jetty 9.3.20.v20170531
+New Features
+----------------------
+* SOLR-11285: Simulation framework for autoscaling. (ab)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
index f60332c..520a269 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
@@ -20,7 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
-import org.apache.solr.util.TimeSource;
+import org.apache.solr.common.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,9 +35,7 @@ public class ActionThrottle {
private final TimeSource timeSource;
public ActionThrottle(String name, long minMsBetweenActions) {
- this.name = name;
- this.minMsBetweenActions = minMsBetweenActions;
- this.timeSource = TimeSource.NANO_TIME;
+ this(name, minMsBetweenActions, TimeSource.NANO_TIME);
}
public ActionThrottle(String name, long minMsBetweenActions, TimeSource timeSource) {
@@ -47,16 +45,20 @@ public class ActionThrottle {
}
public ActionThrottle(String name, long minMsBetweenActions, long lastActionStartedAt) {
+ this(name, minMsBetweenActions, lastActionStartedAt, TimeSource.NANO_TIME);
+ }
+
+ public ActionThrottle(String name, long minMsBetweenActions, long lastActionStartedAt, TimeSource timeSource) {
this.name = name;
this.minMsBetweenActions = minMsBetweenActions;
this.lastActionStartedAt = lastActionStartedAt;
- this.timeSource = TimeSource.NANO_TIME;
+ this.timeSource = timeSource;
}
public void reset() {
lastActionStartedAt = null;
}
-
+
public void markAttemptingAction() {
lastActionStartedAt = timeSource.getTime();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index c785f9f..71a54c14 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -48,7 +49,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
@@ -56,6 +56,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
@@ -69,81 +70,30 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
@Override
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- addReplica(ocmh.zkStateReader.getClusterState(), message, results, null);
+ addReplica(state, message, results, null);
}
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws IOException, InterruptedException {
log.debug("addReplica() : {}", Utils.toJSONString(message));
+ boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+ boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+ final String asyncId = message.getStr(ASYNC);
+
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+ message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
+
String collection = message.getStr(COLLECTION_PROP);
+ DocCollection coll = clusterState.getCollection(collection);
+
String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
- int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+ int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
boolean parallel = message.getBool("parallel", false);
- boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
- if (StringUtils.isBlank(coreName)) {
- coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
- }
-
- final String asyncId = message.getStr(ASYNC);
-
- DocCollection coll = clusterState.getCollection(collection);
- if (coll == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
- }
- if (coll.getSlice(shard) == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Collection: " + collection + " shard: " + shard + " does not exist");
- }
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
- boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
-
- AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
- // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
- if (!skipCreateReplicaInClusterState) {
- if (CreateShardCmd.usePolicyFramework(coll, ocmh)) {
- if (node == null) {
- if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
- node = Assign.identifyNodes(ocmh,
- clusterState,
- Collections.emptyList(),
- collection,
- message,
- Collections.singletonList(shard),
- replicaType == Replica.Type.NRT ? 0 : 1,
- replicaType == Replica.Type.TLOG ? 0 : 1,
- replicaType == Replica.Type.PULL ? 0 : 1
- ).get(0).node;
- sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
- }
- } else {
- node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
- ocmh.overseer.getSolrCloudManager()).get(0).nodeName;// TODO: use replica type in this logic too
- }
- }
- log.info("Node Identified {} for creating new replica", node);
-
- if (!clusterState.liveNodesContain(node)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
- }
- if (coreName == null) {
- coreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, shard, replicaType);
- } else if (!skipCreateReplicaInClusterState) {
- //Validate that the core name is unique in that collection
- for (Slice slice : coll.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- String replicaCoreName = replica.getStr(CORE_NAME_PROP);
- if (coreName.equals(replicaCoreName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
- " for this collection");
- }
- }
- }
- }
ModifiableSolrParams params = new ModifiableSolrParams();
ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -210,6 +160,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
// For tracking async calls.
Map<String,String> requestMap = new HashMap<>();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
final String fnode = node;
@@ -253,4 +205,75 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
);
}
+ public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
+ ZkNodeProps message, AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+ boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+
+ String collection = message.getStr(COLLECTION_PROP);
+ String node = message.getStr(CoreAdminParams.NODE);
+ String shard = message.getStr(SHARD_ID_PROP);
+ String coreName = message.getStr(CoreAdminParams.NAME);
+ String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
+ Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
+ if (StringUtils.isBlank(coreName)) {
+ coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
+ }
+
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ if (coll.getSlice(shard) == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " shard: " + shard + " does not exist");
+ }
+
+ // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
+ if (!skipCreateReplicaInClusterState) {
+ if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+ if (node == null) {
+ if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
+ node = Assign.identifyNodes(cloudManager,
+ clusterState,
+ Collections.emptyList(),
+ collection,
+ message,
+ Collections.singletonList(shard),
+ replicaType == Replica.Type.NRT ? 0 : 1,
+ replicaType == Replica.Type.TLOG ? 0 : 1,
+ replicaType == Replica.Type.PULL ? 0 : 1
+ ).get(0).node;
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+ }
+ } else {
+ node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node,
+ cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
+ }
+ }
+ log.info("Node Identified {} for creating new replica", node);
+
+ if (!clusterState.liveNodesContain(node)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+ }
+ if (coreName == null) {
+ coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
+ } else if (!skipCreateReplicaInClusterState) {
+ //Validate that the core name is unique in that collection
+ for (Slice slice : coll.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ String replicaCoreName = replica.getStr(CORE_NAME_PROP);
+ if (coreName.equals(replicaCoreName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
+ " for this collection");
+ }
+ }
+ }
+ }
+ if (coreNodeName != null) {
+ message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+ }
+ message = message.plus(CoreAdminParams.NAME, coreName);
+ message = message.plus(CoreAdminParams.NODE, node);
+ return message;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index fd0738f..c746c94 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -114,7 +114,7 @@ public class Assign {
}
}
- public static String assignNode(DistribStateManager stateManager, DocCollection collection) {
+ public static String assignCoreNodeName(DistribStateManager stateManager, DocCollection collection) {
// for backward compatibility;
int defaultValue = defaultCounterValue(collection, false);
String coreNodeName = "core_node" + incAndGetId(stateManager, collection.getName(), defaultValue);
@@ -170,7 +170,7 @@ public class Assign {
return returnShardId;
}
- private static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
+ private static String buildSolrCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
// TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
}
@@ -187,20 +187,20 @@ public class Assign {
return defaultValue * 20;
}
- public static String buildCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
+ public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
Slice slice = collection.getSlice(shard);
int defaultValue = defaultCounterValue(collection, newCollection);
int replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
- String coreName = buildCoreName(collection.getName(), shard, type, replicaNum);
+ String coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
while (existCoreName(coreName, slice)) {
replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
- coreName = buildCoreName(collection.getName(), shard, type, replicaNum);
+ coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
}
return coreName;
}
- public static String buildCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type) {
- return buildCoreName(stateManager, collection, shard, type, false);
+ public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type) {
+ return buildSolrCoreName(stateManager, collection, shard, type, false);
}
private static boolean existCoreName(String coreName, Slice slice) {
@@ -237,7 +237,7 @@ public class Assign {
return nodeList;
}
- public static List<ReplicaPosition> identifyNodes(OverseerCollectionMessageHandler ocmh,
+ public static List<ReplicaPosition> identifyNodes(SolrCloudManager cloudManager,
ClusterState clusterState,
List<String> nodeList,
String collectionName,
@@ -248,7 +248,7 @@ public class Assign {
int numPullReplicas) throws IOException, InterruptedException {
List<Map> rulesMap = (List) message.get("rule");
String policyName = message.getStr(POLICY);
- AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
log.debug("Identify nodes using default");
@@ -283,7 +283,7 @@ public class Assign {
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
- ocmh.overseer.getSolrCloudManager(),
+ cloudManager,
clusterState);
Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
@@ -294,7 +294,7 @@ public class Assign {
if (message.getStr(CREATE_NODE_SET) == null)
nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
return getPositionsUsingPolicy(collectionName,
- shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
+ shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, cloudManager, nodeList);
}
}
@@ -397,7 +397,7 @@ public class Assign {
nodesList);
return replicaPositions;
} catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing CloudSolrClient",e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error getting replica locations", e);
} finally {
if (log.isTraceEnabled()) {
if (replicaPositions != null)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java b/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
index 6e0583f..3cdc903 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
@@ -42,7 +42,7 @@ public class CloudConfigSetService extends ConfigSetService {
try {
// for back compat with cores that can create collections without the collections API
if (!zkController.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + cd.getCollectionName(), true)) {
- CreateCollectionCmd.createCollectionZkNode(zkController.getZkClient(), cd.getCollectionName(), cd.getCloudDescriptor().getParams());
+ CreateCollectionCmd.createCollectionZkNode(zkController.getSolrCloudManager().getDistribStateManager(), cd.getCollectionName(), cd.getCloudDescriptor().getParams());
}
} catch (KeeperException e) {
SolrException.log(log, null, e);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index 62cde7c..30de3d4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.DocCollection;
@@ -130,4 +132,9 @@ public class CloudUtil {
}
+ static boolean usePolicyFramework(DocCollection collection, SolrCloudManager cloudManager)
+ throws IOException, InterruptedException {
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+ return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 2c4f01e..2171c60 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -18,6 +18,7 @@
package org.apache.solr.cloud;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
@@ -26,11 +27,18 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.common.SolrException;
@@ -40,7 +48,6 @@ import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -51,6 +58,7 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
import org.apache.solr.handler.component.ShardHandler;
@@ -70,7 +78,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -81,11 +88,13 @@ import static org.apache.solr.common.util.StrUtils.formatString;
public class CreateCollectionCmd implements Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
- private SolrZkClient zkClient;
+ private final TimeSource timeSource;
+ private final DistribStateManager stateManager;
public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
- this.zkClient = ocmh.zkStateReader.getZkClient();
+ this.stateManager = ocmh.cloudManager.getDistribStateManager();
+ this.timeSource = ocmh.cloudManager.getTimeSource();
}
@Override
@@ -103,95 +112,20 @@ public class CreateCollectionCmd implements Cmd {
}
ocmh.validateConfigOrThrowSolrException(configName);
- PolicyHelper.SessionWrapper sessionWrapper = null;
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
try {
- // look at the replication factor and see if it matches reality
- // if it does not, find best nodes to create more cores
- int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
- int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
- int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
- Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
- String policy = message.getStr(Policy.POLICY);
- boolean usePolicyFramework = autoScalingJson.get(Policy.CLUSTER_POLICY) != null || policy != null;
-
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
final String async = message.getStr(ASYNC);
- Integer numSlices = message.getInt(NUM_SLICES, null);
- String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+ List<String> nodeList = new ArrayList<>();
List<String> shardNames = new ArrayList<>();
- if(ImplicitDocRouter.NAME.equals(router)){
- ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
- numSlices = shardNames.size();
- } else {
- if (numSlices == null ) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
- }
- ClusterStateMutator.getShardNames(numSlices, shardNames);
- }
-
- int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
- if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
- }
- if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
- if (numNrtReplicas + numTlogReplicas <= 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
- }
-
- if (numSlices <= 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
- }
-
- // we need to look at every node and see how many cores it serves
- // add our new cores to existing nodes serving the least number of cores
- // but (for now) require that each core goes on a distinct node.
-
- final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
- List<ReplicaPosition> replicaPositions;
- if (nodeList.isEmpty()) {
- log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
-
- replicaPositions = new ArrayList<>();
- } else {
- int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
- if (totalNumReplicas > nodeList.size()) {
- log.warn("Specified number of replicas of "
- + totalNumReplicas
- + " on collection "
- + collectionName
- + " is higher than the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
- + nodeList.size()
- + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
- }
-
- int maxShardsAllowedToCreate = maxShardsPerNode == Integer.MAX_VALUE ?
- Integer.MAX_VALUE :
- maxShardsPerNode * nodeList.size();
- int requestedShardsToCreate = numSlices * totalNumReplicas;
- if (maxShardsAllowedToCreate < requestedShardsToCreate) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
- + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
- + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
- + ". This allows a maximum of " + maxShardsAllowedToCreate
- + " to be created. Value of " + NUM_SLICES + " is " + numSlices
- + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
- + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
- + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
- + ". This requires " + requestedShardsToCreate
- + " shards to be created (higher than the allowed number)");
- }
- replicaPositions = Assign.identifyNodes(ocmh
- , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
- sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
- }
-
+ List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
+ nodeList, shardNames, sessionWrapper);
ZkStateReader zkStateReader = ocmh.zkStateReader;
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
- ocmh.createConfNode(configName, collectionName, isLegacyCloud);
+ ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
Map<String,String> collectionParams = new HashMap<>();
Map<String,Object> collectionProps = message.getProperties();
@@ -201,16 +135,16 @@ public class CreateCollectionCmd implements Cmd {
}
}
- createCollectionZkNode(zkClient, collectionName, collectionParams);
+ createCollectionZkNode(stateManager, collectionName, collectionParams);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// wait for a while until we don't see the collection
- TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS);
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean created = false;
while (! waitUntil.hasTimedOut()) {
- Thread.sleep(100);
- created = zkStateReader.getClusterState().hasCollection(collectionName);
+ waitUntil.sleep(100);
+ created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
if(created) break;
}
if (!created)
@@ -225,12 +159,14 @@ public class CreateCollectionCmd implements Cmd {
Map<String, String> requestMap = new HashMap<>();
- log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}",
- collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas));
+ log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
+ collectionName, shardNames, message));
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
- String coreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), zkStateReader.getClusterState().getCollection(collectionName),
+ String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
+ ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
replicaPosition.shard, replicaPosition.type, true);
log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
, coreName, replicaPosition.shard, collectionName, nodeName));
@@ -260,7 +196,7 @@ public class CreateCollectionCmd implements Cmd {
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, replicaPosition.shard);
- params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size());
params.set(CoreAdminParams.NEW_COLLECTION, "true");
params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
@@ -320,10 +256,93 @@ public class CreateCollectionCmd implements Cmd {
} catch (Exception ex) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
} finally {
- if(sessionWrapper != null) sessionWrapper.release();
+ if (sessionWrapper.get() != null) sessionWrapper.get().release();
}
}
+ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+ ZkNodeProps message,
+ List<String> nodeList, List<String> shardNames,
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+ final String collectionName = message.getStr(NAME);
+ // look at the replication factor and see if it matches reality
+ // if it does not, find best nodes to create more cores
+ int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
+ int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
+ int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+ String policy = message.getStr(Policy.POLICY);
+ boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
+
+ Integer numSlices = message.getInt(NUM_SLICES, null);
+ String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+ if(ImplicitDocRouter.NAME.equals(router)){
+ ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+ numSlices = shardNames.size();
+ } else {
+ if (numSlices == null ) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
+ }
+ if (numSlices <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
+ }
+ ClusterStateMutator.getShardNames(numSlices, shardNames);
+ }
+
+ int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+ if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
+ }
+ if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
+ if (numNrtReplicas + numTlogReplicas <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+ }
+
+ // we need to look at every node and see how many cores it serves
+ // add our new cores to existing nodes serving the least number of cores
+ // but (for now) require that each core goes on a distinct node.
+
+ List<ReplicaPosition> replicaPositions;
+ nodeList.addAll(Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM));
+ if (nodeList.isEmpty()) {
+ log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
+
+ replicaPositions = new ArrayList<>();
+ } else {
+ int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
+ if (totalNumReplicas > nodeList.size()) {
+ log.warn("Specified number of replicas of "
+ + totalNumReplicas
+ + " on collection "
+ + collectionName
+ + " is higher than the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+ + nodeList.size()
+ + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
+ }
+
+ int maxShardsAllowedToCreate = maxShardsPerNode == Integer.MAX_VALUE ?
+ Integer.MAX_VALUE :
+ maxShardsPerNode * nodeList.size();
+ int requestedShardsToCreate = numSlices * totalNumReplicas;
+ if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+ + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
+ + ". This allows a maximum of " + maxShardsAllowedToCreate
+ + " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
+ + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
+ + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
+ + ". This requires " + requestedShardsToCreate
+ + " shards to be created (higher than the allowed number)");
+ }
+ replicaPositions = Assign.identifyNodes(cloudManager
+ , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+ }
+ return replicaPositions;
+ }
+
String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
String configName = message.getStr(COLL_CONF);
@@ -375,12 +394,12 @@ public class CreateCollectionCmd implements Cmd {
}
}
- public static void createCollectionZkNode(SolrZkClient zkClient, String collection, Map<String,String> params) {
+ public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
log.debug("Check for collection zkNode:" + collection);
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
try {
- if (!zkClient.exists(collectionPath, true)) {
+ if (!stateManager.hasData(collectionPath)) {
log.debug("Creating collection in ZooKeeper:" + collection);
try {
@@ -394,7 +413,7 @@ public class CreateCollectionCmd implements Cmd {
// if the config name wasn't passed in, use the default
if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP)) {
// users can create the collection node and conf link ahead of time, or this may return another option
- getConfName(zkClient, collection, collectionPath, collectionProps);
+ getConfName(stateManager, collection, collectionPath, collectionProps);
}
} else if (System.getProperty("bootstrap_confdir") != null) {
@@ -417,19 +436,21 @@ public class CreateCollectionCmd implements Cmd {
// the conf name should should be the collection name of this core
collectionProps.put(ZkController.CONFIGNAME_PROP, collection);
} else {
- getConfName(zkClient, collection, collectionPath, collectionProps);
+ getConfName(stateManager, collection, collectionPath, collectionProps);
}
collectionProps.remove(ZkStateReader.NUM_SHARDS_PROP); // we don't put numShards in the collections properties
ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
- zkClient.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
+ stateManager.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, false);
} catch (KeeperException e) {
// it's okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
throw e;
}
+ } catch (AlreadyExistsException e) {
+ // it's okay if the node already exists
}
} else {
log.debug("Collection zkNode exists");
@@ -441,6 +462,8 @@ public class CreateCollectionCmd implements Cmd {
return;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
@@ -448,8 +471,8 @@ public class CreateCollectionCmd implements Cmd {
}
- private static void getConfName(SolrZkClient zkClient, String collection, String collectionPath, Map<String,Object> collectionProps) throws KeeperException,
- InterruptedException {
+ private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException,
+ KeeperException, InterruptedException {
// check for configName
log.debug("Looking for collection configName");
if (collectionProps.containsKey("configName")) {
@@ -461,17 +484,17 @@ public class CreateCollectionCmd implements Cmd {
int retry = 1;
int retryLimt = 6;
for (; retry < retryLimt; retry++) {
- if (zkClient.exists(collectionPath, true)) {
- ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null, true));
+ if (stateManager.hasData(collectionPath)) {
+ VersionedData data = stateManager.getData(collectionPath);
+ ZkNodeProps cProps = ZkNodeProps.load(data.getData());
if (cProps.containsKey(ZkController.CONFIGNAME_PROP)) {
break;
}
}
try {
- configNames = zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null,
- true);
- } catch (NoNodeException e) {
+ configNames = stateManager.listData(ZkConfigManager.CONFIGS_ZKNODE);
+ } catch (NoSuchElementException | NoNodeException e) {
// just keep trying
}
@@ -507,15 +530,4 @@ public class CreateCollectionCmd implements Cmd {
"Could not find configName for collection " + collection + " found:" + configNames);
}
}
-
- public static boolean usePolicyFramework(ZkStateReader zkStateReader, ZkNodeProps message) {
- Map autoScalingJson = Collections.emptyMap();
- try {
- autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
- } catch (Exception e) {
- return false;
- }
- return autoScalingJson.get(Policy.CLUSTER_POLICY) != null || message.getStr(Policy.POLICY) != null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
index 18b0b63..c6afdcc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -25,11 +25,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
@@ -76,59 +77,21 @@ public class CreateShardCmd implements Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
DocCollection collection = clusterState.getCollection(collectionName);
- int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
- int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
- int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
- int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
-
- if (numNrtReplicas + numTlogReplicas <= 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
- }
-
- Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
ZkStateReader zkStateReader = ocmh.zkStateReader;
- PolicyHelper.SessionWrapper sessionWrapper = null;
- boolean usePolicyFramework = usePolicyFramework(collection,ocmh);
- List<ReplicaPosition> positions = null;
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
SolrCloseableLatch countDownLatch;
try {
- if (usePolicyFramework) {
- if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
- positions = Assign.identifyNodes(ocmh,
- clusterState,
- Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM),
- collectionName,
- message,
- Collections.singletonList(sliceName),
- numNrtReplicas,
- numTlogReplicas,
- numPullReplicas);
- sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
- } else {
- List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
- createNodeSetStr, ocmh.overseer.getSolrCloudManager());
- int i = 0;
- positions = new ArrayList<>();
- for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
- Replica.Type.TLOG, numTlogReplicas,
- Replica.Type.PULL, numPullReplicas
- ).entrySet()) {
- for (int j = 0; j < e.getValue(); j++) {
- positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
- i++;
- }
- }
- }
+ List<ReplicaPosition> positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// wait for a while until we see the shard
ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
- countDownLatch = new SolrCloseableLatch(totalReplicas, ocmh);
+ countDownLatch = new SolrCloseableLatch(positions.size(), ocmh);
for (ReplicaPosition position : positions) {
String nodeName = position.node;
- String coreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), collection, sliceName, position.type);
+ String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type);
log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
+ " on " + nodeName);
@@ -166,7 +129,7 @@ public class CreateShardCmd implements Cmd {
});
}
} finally {
- if(sessionWrapper != null) sessionWrapper.release();
+ if (sessionWrapper.get() != null) sessionWrapper.get().release();
}
log.debug("Waiting for create shard action to complete");
@@ -177,9 +140,52 @@ public class CreateShardCmd implements Cmd {
}
- static boolean usePolicyFramework(DocCollection collection, OverseerCollectionMessageHandler ocmh)
- throws IOException, InterruptedException {
- AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
- return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+ String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+ String sliceName = message.getStr(SHARD_ID_PROP);
+ DocCollection collection = clusterState.getCollection(collectionName);
+
+ int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
+ int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
+ int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
+ int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
+
+ if (numNrtReplicas + numTlogReplicas <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+ }
+
+ Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
+
+ boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+ List<ReplicaPosition> positions;
+ if (usePolicyFramework) {
+ if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
+ positions = Assign.identifyNodes(cloudManager,
+ clusterState,
+ Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM),
+ collection.getName(),
+ message,
+ Collections.singletonList(sliceName),
+ numNrtReplicas,
+ numTlogReplicas,
+ numPullReplicas);
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+ } else {
+ List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas,
+ createNodeSetStr, cloudManager);
+ int i = 0;
+ positions = new ArrayList<>();
+ for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+ Replica.Type.TLOG, numTlogReplicas,
+ Replica.Type.PULL, numPullReplicas
+ ).entrySet()) {
+ for (int j = 0; j < e.getValue(); j++) {
+ positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
+ i++;
+ }
+ }
+ }
+ return positions;
}
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
index d2e40f7..dc91905 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.util.TimeOut;
@@ -49,9 +50,11 @@ import static org.apache.solr.common.params.CommonParams.NAME;
public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
}
@Override
@@ -94,13 +97,13 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
// wait for a while until we don't see the collection
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean removed = false;
while (! timeout.hasTimedOut()) {
- Thread.sleep(100);
+ timeout.sleep(100);
removed = !zkStateReader.getClusterState().hasCollection(collection);
if (removed) {
- Thread.sleep(500); // just a bit of time so it's more likely other
+ timeout.sleep(500); // just a bit of time so it's more likely other
// readers see on return
break;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
index f13fed5..58c4e63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
@@ -55,9 +56,11 @@ import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
public class DeleteShardCmd implements Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
}
@Override
@@ -134,14 +137,14 @@ public class DeleteShardCmd implements Cmd {
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
// wait for a while until we don't see the shard
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean removed = false;
while (!timeout.hasTimedOut()) {
- Thread.sleep(100);
+ timeout.sleep(100);
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
removed = collection.getSlice(sliceId) == null;
if (removed) {
- Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+ timeout.sleep(100); // just a bit of time so it's more likely other readers see on return
break;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
index cacccb0..02fdb5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
@@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -63,9 +64,11 @@ import static org.apache.solr.common.util.Utils.makeMap;
public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
}
@@ -179,10 +182,10 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
// wait for a while until we see the new rule
log.info("Waiting to see routing rule updated in clusterstate");
- TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS);
+ TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS, timeSource);
boolean added = false;
while (!waitUntil.hasTimedOut()) {
- Thread.sleep(100);
+ waitUntil.sleep(100);
sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
@@ -257,7 +260,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
tempSourceCollectionName, targetLeader.getNodeName());
- String tempCollectionReplica2 = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
+ String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
zkStateReader.getClusterState().getCollection(tempSourceCollectionName), tempSourceSlice.getName(), Replica.Type.NRT);
props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index 71d5c82..44493ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.TimeOut;
@@ -54,9 +55,11 @@ public class MoveReplicaCmd implements Cmd{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
}
@Override
@@ -158,11 +161,11 @@ public class MoveReplicaCmd implements Cmd{
return;
}
- TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS);
+ TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource);
while (!timeOut.hasTimedOut()) {
coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName());
if (coll.getReplica(replica.getName()) != null) {
- Thread.sleep(100);
+ timeOut.sleep(100);
} else {
break;
}
@@ -233,7 +236,7 @@ public class MoveReplicaCmd implements Cmd{
private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
- String newCoreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
+ String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3b65d6f..d1bb13a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -80,7 +80,7 @@ public class Overseer implements SolrCloseable {
enum LeaderStatus {DONT_KNOW, NO, YES}
private class ClusterStateUpdater implements Runnable, Closeable {
-
+
private final ZkStateReader reader;
private final SolrZkClient zkClient;
private final String myId;
@@ -88,7 +88,7 @@ public class Overseer implements SolrCloseable {
private final ZkDistributedQueue stateUpdateQueue;
//TODO remove in 9.0, we do not push message into this queue anymore
//Internal queue where overseer stores events that have not yet been published into cloudstate
- //If Overseer dies while extracting the main queue a new overseer will start from this queue
+ //If Overseer dies while extracting the main queue a new overseer will start from this queue
private final ZkDistributedQueue workQueue;
// Internal map which holds the information about running tasks.
private final DistributedMap runningMap;
@@ -120,7 +120,7 @@ public class Overseer implements SolrCloseable {
public Stats getWorkQueueStats() {
return workQueue.getZkStats();
}
-
+
@Override
public void run() {
@@ -442,7 +442,7 @@ public class Overseer implements SolrCloseable {
}
- static class OverseerThread extends Thread implements Closeable {
+ public static class OverseerThread extends Thread implements Closeable {
protected volatile boolean isClosed;
private Closeable thread;
@@ -466,9 +466,9 @@ public class Overseer implements SolrCloseable {
public boolean isClosed() {
return this.isClosed;
}
-
+
}
-
+
private OverseerThread ccThread;
private OverseerThread updaterThread;
@@ -478,7 +478,7 @@ public class Overseer implements SolrCloseable {
private final ZkStateReader reader;
private final ShardHandler shardHandler;
-
+
private final UpdateShardHandler updateShardHandler;
private final String adminPath;
@@ -505,7 +505,7 @@ public class Overseer implements SolrCloseable {
this.stats = new Stats();
this.config = config;
}
-
+
public synchronized void start(String id) {
this.id = id;
closed = false;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index af04453..60b098a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -36,7 +36,10 @@ import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -66,6 +69,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SuppressForbidden;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -74,6 +78,7 @@ import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,8 +149,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ShardHandlerFactory shardHandlerFactory;
String adminPath;
ZkStateReader zkStateReader;
+ SolrCloudManager cloudManager;
String myId;
Stats stats;
+ TimeSource timeSource;
// Set that tracks collections that are currently being processed by a running task.
// This is used for handling mutual exclusion of the tasks.
@@ -183,6 +190,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
this.myId = myId;
this.stats = stats;
this.overseer = overseer;
+ this.cloudManager = overseer.getSolrCloudManager();
+ this.timeSource = cloudManager.getTimeSource();
this.isClosed = false;
commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
.put(REPLACENODE, new ReplaceNodeCmd(this))
@@ -230,7 +239,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
CollectionAction action = getCollectionAction(operation);
Cmd command = commandMap.get(action);
if (command != null) {
- command.call(zkStateReader.getClusterState(), message, results);
+ command.call(cloudManager.getClusterStateProvider().getClusterState(), message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@@ -424,9 +433,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
- TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
+ TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
while (! timeout.hasTimedOut()) {
- Thread.sleep(100);
+ timeout.sleep(100);
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
if (docCollection == null) { // someone already deleted the collection
return true;
@@ -466,7 +475,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
boolean firstLoop = true;
// wait for a while until the state format changes
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
while (! timeout.hasTimedOut()) {
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
if (collection == null) {
@@ -484,7 +493,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
}
- Thread.sleep(100);
+ timeout.sleep(100);
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
}
@@ -643,16 +652,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
validateConfigOrThrowSolrException(configName);
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
- createConfNode(configName, collectionName, isLegacyCloud);
+ createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
}
overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean areChangesVisible = true;
while (!timeout.hasTimedOut()) {
- DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
areChangesVisible = true;
for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
String updateKey = updateEntry.getKey();
@@ -664,7 +673,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
}
if (areChangesVisible) break;
- Thread.sleep(100);
+ timeout.sleep(100);
}
if (!areChangesVisible)
@@ -681,7 +690,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
Map<String, Replica> result = new HashMap<>();
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
while (true) {
DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
for (String coreName : coreNames) {
@@ -747,8 +756,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
- void validateConfigOrThrowSolrException(String configName) throws KeeperException, InterruptedException {
- boolean isValid = zkStateReader.getZkClient().exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, true);
+ void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
+ boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
if(!isValid) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
}
@@ -758,16 +767,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
* This doesn't validate the config (path) itself and is just responsible for creating the confNode.
* That check should be done before the config node is created.
*/
- void createConfNode(String configName, String coll, boolean isLegacyCloud) throws KeeperException, InterruptedException {
+ public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
if (configName != null) {
String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
log.debug("creating collections conf node {} ", collDir);
byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
- if (zkStateReader.getZkClient().exists(collDir, true)) {
- zkStateReader.getZkClient().setData(collDir, data, true);
+ if (stateManager.hasData(collDir)) {
+ stateManager.setData(collDir, data, -1);
} else {
- zkStateReader.getZkClient().makePath(collDir, data, true);
+ stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
}
} else {
if(isLegacyCloud){
@@ -776,7 +785,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
}
}
-
}
private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
@@ -972,8 +980,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
);
}
- public final PolicyHelper.SessionRef policySessionRef = new PolicyHelper.SessionRef();
-
@Override
public void close() throws IOException {
this.isClosed = true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 039ab5c..9c9a5c9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -223,7 +223,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
try {
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
- ocmh, clusterState,
+ ocmh.cloudManager, clusterState,
nodeList, restoreCollectionName,
message, sliceNames,
numNrtReplicas, numTlogReplicas, numPullReplicas);