You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2014/12/01 18:25:47 UTC
svn commit: r1642718 [8/12] - in /lucene/dev/branches/lucene2878: ./
dev-tools/ dev-tools/eclipse/dot.settings/ dev-tools/idea/.idea/
dev-tools/idea/lucene/benchmark/src/ dev-tools/idea/lucene/highlighter/
dev-tools/maven/ dev-tools/maven/solr/webapp/ ...
Modified: lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml (original)
+++ lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml Mon Dec 1 17:25:39 2014
@@ -361,7 +361,7 @@
documentation for more information on pattern and replacement
string syntax.
- http://docs.oracle.com/javase/7/docs/api/java/util/regex/package-summary.html
+ http://docs.oracle.com/javase/8/docs/api/java/util/regex/package-summary.html
-->
<filter class="solr.PatternReplaceFilterFactory" pattern="([^a-z])"
replacement="" replace="all" />
Modified: lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml (original)
+++ lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml Mon Dec 1 17:25:39 2014
@@ -357,7 +357,7 @@
documentation for more information on pattern and replacement
string syntax.
- http://docs.oracle.com/javase/7/docs/api/java/util/regex/package-summary.html
+ http://docs.oracle.com/javase/8/docs/api/java/util/regex/package-summary.html
-->
<filter class="solr.PatternReplaceFilterFactory" pattern="([^a-z])"
replacement="" replace="all" />
Modified: lucene/dev/branches/lucene2878/solr/core/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/build.xml?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/build.xml (original)
+++ lucene/dev/branches/lucene2878/solr/core/build.xml Mon Dec 1 17:25:39 2014
@@ -62,8 +62,8 @@
<target name="resolve" depends="ivy-availability-check,ivy-fail,ivy-configure">
<sequential>
- <ivy:retrieve conf="compile,compile.hadoop" type="jar,bundle" sync="${ivy.sync}" log="download-only"/>
- <ivy:retrieve conf="test,test.DfsMiniCluster" type="jar,bundle,test" sync="${ivy.sync}" log="download-only"
+ <ivy:retrieve conf="compile,compile.hadoop" type="jar,bundle" sync="${ivy.sync}" log="download-only" symlink="${ivy.symlink}"/>
+ <ivy:retrieve conf="test,test.DfsMiniCluster" type="jar,bundle,test" sync="${ivy.sync}" log="download-only" symlink="${ivy.symlink}"
pattern="${test.lib.dir}/[artifact]-[revision](-[classifier]).[ext]"/>
</sequential>
</target>
@@ -92,7 +92,7 @@
<target name="resolve-javacc" xmlns:ivy="antlib:org.apache.ivy.ant">
<!-- setup a "fake" JavaCC distribution folder in ${build.dir} to make JavaCC ANT task happy: -->
<ivy:retrieve organisation="net.java.dev.javacc" module="javacc" revision="5.0"
- inline="true" transitive="false" type="jar" sync="true"
+ inline="true" transitive="false" type="jar" sync="true" symlink="${ivy.symlink}"
pattern="${build.dir}/javacc/bin/lib/[artifact].[ext]"/>
</target>
Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Dec 1 17:25:39 2014
@@ -90,6 +90,7 @@ public class DistributedQueue {
TreeMap<Long,String> orderedChildren = new TreeMap<>();
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+ stats.setQueueLength(childNames.size());
for (String childName : childNames) {
try {
// Check format
@@ -117,6 +118,7 @@ public class DistributedQueue {
throws KeeperException, InterruptedException {
List<String> childNames = zookeeper.getChildren(dir, null, true);
+ stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (childName != null) {
try {
Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Dec 1 17:25:39 2014
@@ -1,10 +1,7 @@
package org.apache.solr.cloud;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
@@ -28,6 +25,7 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -136,13 +134,12 @@ class ShardLeaderElectionContextBase ext
try {
RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
new RetryCmd() {
-
@Override
public void execute() throws Throwable {
- zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
- CreateMode.EPHEMERAL, true);
+ zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps), CreateMode.EPHEMERAL, true);
}
- });
+ }
+ );
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
@@ -152,7 +149,7 @@ class ShardLeaderElectionContextBase ext
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
- Overseer.OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
+ OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP,
@@ -205,7 +202,7 @@ final class ShardLeaderElectionContext e
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
// clear the leader in clusterstate
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower(),
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
collection);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
@@ -407,13 +404,19 @@ final class ShardLeaderElectionContext e
Slice slices = zkController.getClusterState().getSlice(collection, shardId);
int cnt = 0;
- while (true && !isClosed && !cc.isShutDown()) {
+ while (!isClosed && !cc.isShutDown()) {
// wait for everyone to be up
if (slices != null) {
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
+ if (e instanceof KeeperException.SessionExpiredException) {
+ // if the session has expired, then another election will be launched, so
+ // quit here
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "ZK session expired - cancelling election for " + collection + " " + shardId);
+ }
SolrException.log(log,
"Error checking for the number of election participants", e);
}
Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Dec 1 17:25:39 2014
@@ -17,46 +17,41 @@ package org.apache.solr.cloud;
* the License.
*/
-import static java.util.Collections.singletonMap;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
-import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.CollectionMutator;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.cloud.overseer.ReplicaMutator;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.overseer.ZkStateWriter;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.RoutingRule;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
@@ -73,7 +68,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Cluster leader. Responsible node assignments, cluster state file?
+ * Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
+ * collections, shards, replicas and setting various properties.
*/
public class Overseer implements Closeable {
public static final String QUEUE_OPERATION = "operation";
@@ -90,53 +86,12 @@ public class Overseer implements Closeab
@Deprecated
public static final String REMOVESHARD = "removeshard";
- /**
- * Enum of actions supported by the overseer only.
- *
- * There are other actions supported which are public and defined
- * in {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
- */
- public static enum OverseerAction {
- LEADER,
- DELETECORE,
- ADDROUTINGRULE,
- REMOVEROUTINGRULE,
- UPDATESHARDSTATE,
- STATE,
- QUIT;
-
- public static OverseerAction get(String p) {
- if (p != null) {
- try {
- return OverseerAction.valueOf(p.toUpperCase(Locale.ROOT));
- } catch (Exception ex) {
- }
- }
- return null;
- }
-
- public boolean isEqual(String s) {
- return s != null && toString().equals(s.toUpperCase(Locale.ROOT));
- }
-
- public String toLower() {
- return toString().toLowerCase(Locale.ROOT);
- }
- }
-
-
public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
private static Logger log = LoggerFactory.getLogger(Overseer.class);
static enum LeaderStatus {DONT_KNOW, NO, YES}
- public static final String preferredLeaderProp = COLL_PROP_PREFIX + "preferredleader";
-
- public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of(preferredLeaderProp);
-
- private long lastUpdatedTime = 0;
-
private class ClusterStateUpdater implements Runnable, Closeable {
private final ZkStateReader reader;
@@ -159,10 +114,6 @@ public class Overseer implements Closeab
private Map clusterProps;
private boolean isClosed = false;
- private final Map<String, Object> updateNodes = new LinkedHashMap<>();
- private boolean isClusterStateModified = false;
-
-
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient();
this.zkStats = zkStats;
@@ -205,7 +156,9 @@ public class Overseer implements Closeab
reader.updateClusterState(true);
ClusterState clusterState = reader.getClusterState();
log.info("Replaying operations from work queue.");
-
+
+ ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+
while (head != null) {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
@@ -213,33 +166,18 @@ public class Overseer implements Closeab
}
else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
- final String operation = message.getStr(QUEUE_OPERATION);
- final TimerContext timerContext = stats.time(operation);
- try {
- clusterState = processMessage(clusterState, message, operation);
- stats.success(operation);
- } catch (Exception e) {
- // generally there is nothing we can do - in most cases, we have
- // an issue that will fail again on retry or we cannot communicate with a
- // ZooKeeper in which case another Overseer should take over
- // TODO: if ordering for the message is not important, we could
- // track retries and put it back on the end of the queue
- log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
- stats.error(operation);
- } finally {
- timerContext.stop();
- }
- updateZkStates(clusterState);
-
+ log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
+ clusterState = processQueueItem(message, clusterState, zkStateWriter);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
log.info("am_i_leader unclear {}", isLeader);
// re-peek below in case our 'head' value is out-of-date by now
}
-
head = workQueue.peek();
}
+ // force flush at the end of the loop
+ clusterState = zkStateWriter.writePendingUpdates();
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
@@ -259,8 +197,10 @@ public class Overseer implements Closeab
}
log.info("Starting to work on the main queue");
- int lastStateFormat = -1; // sentinel
try {
+ ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+ ClusterState clusterState = null;
+ boolean refreshClusterState = true; // let's refresh in the first iteration
while (!this.isClosed) {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
@@ -289,56 +229,46 @@ public class Overseer implements Closeab
}
synchronized (reader.getUpdateLock()) {
try {
- reader.updateClusterState(true);
- ClusterState clusterState = reader.getClusterState();
+ if (refreshClusterState) {
+ reader.updateClusterState(true);
+ clusterState = reader.getClusterState();
+ refreshClusterState = false;
+
+ // if there were any errors while processing
+ // the state queue, items would have been left in the
+ // work queue so let's process those first
+ byte[] data = workQueue.peek();
+ boolean hadWorkItems = data != null;
+ while (data != null) {
+ final ZkNodeProps message = ZkNodeProps.load(data);
+ log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
+ clusterState = processQueueItem(message, clusterState, zkStateWriter);
+ workQueue.poll(); // poll-ing removes the element we got by peek-ing
+ data = workQueue.peek();
+ }
+ // force flush at the end of the loop
+ if (hadWorkItems) {
+ clusterState = zkStateWriter.writePendingUpdates();
+ }
+ }
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
- final String operation = message.getStr(QUEUE_OPERATION);
-
- // we batch updates for the main cluster state together (stateFormat=1)
- // but if we encounter a message for a collection with a stateFormat different than the last
- // then we stop batching at that point
- String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (collection == null) collection = message.getStr("name");
- if (collection != null) {
- DocCollection docCollection = clusterState.getCollectionOrNull(collection);
- if (lastStateFormat != -1 && docCollection != null && docCollection.getStateFormat() != lastStateFormat) {
- lastStateFormat = docCollection.getStateFormat();
- break;
- }
- if (docCollection != null) {
- lastStateFormat = docCollection.getStateFormat();
- }
- }
-
- final TimerContext timerContext = stats.time(operation);
- try {
- clusterState = processMessage(clusterState, message, operation);
- stats.success(operation);
- } catch (Exception e) {
- // generally there is nothing we can do - in most cases, we have
- // an issue that will fail again on retry or we cannot communicate with
- // ZooKeeper in which case another Overseer should take over
- // TODO: if ordering for the message is not important, we could
- // track retries and put it back on the end of the queue
- log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
- stats.error(operation);
- } finally {
- timerContext.stop();
- }
+ log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
+ clusterState = processQueueItem(message, clusterState, zkStateWriter);
workQueue.offer(head.getBytes());
stateUpdateQueue.poll();
- if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
- if(!updateNodes.isEmpty()) break;
+ if (isClosed) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
- updateZkStates(clusterState);
+ // we should force write all pending updates because the next iteration might sleep until there
+ // are more items in the main queue
+ clusterState = zkStateWriter.writePendingUpdates();
// clean work queue
- while (workQueue.poll() != null) ;
+ while (workQueue.poll() != null);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
@@ -346,12 +276,13 @@ public class Overseer implements Closeab
return;
}
log.error("Exception in Overseer main queue loop", e);
+ refreshClusterState = true; // it might have been a bad version error
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
-
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
+ refreshClusterState = true; // it might have been a bad version error
}
}
@@ -368,57 +299,28 @@ public class Overseer implements Closeab
}
}
- private void updateZkStates(ClusterState clusterState) throws KeeperException, InterruptedException {
- TimerContext timerContext = stats.time("update_state");
- boolean success = false;
+ private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter) throws KeeperException, InterruptedException {
+ final String operation = message.getStr(QUEUE_OPERATION);
+ ZkWriteCommand zkWriteCommand = null;
+ final TimerContext timerContext = stats.time(operation);
try {
- if (!updateNodes.isEmpty()) {
- for (Entry<String,Object> e : updateNodes.entrySet()) {
- if (e.getValue() == null) {
- if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
- } else {
- byte[] data = ZkStateReader.toJSON(e.getValue());
- if (zkClient.exists(e.getKey(), true)) {
- log.info("going to update_collection {}", e.getKey());
- zkClient.setData(e.getKey(), data, true);
- } else {
- log.info("going to create_collection {}", e.getKey());
- String parentPath = e.getKey().substring(0, e.getKey().lastIndexOf('/'));
- if (!zkClient.exists(parentPath, true)) {
- // if the /collections/collection_name path doesn't exist then it means that
- // 1) the user invoked a DELETE collection API and the OverseerCollectionProcessor has deleted
- // this zk path.
- // 2) these are most likely old "state" messages which are only being processed now because
- // if they were new "state" messages then in legacy mode, a new collection would have been
- // created with stateFormat = 1 (which is the default state format)
- // 3) these can't be new "state" messages created for a new collection because
- // otherwise the OverseerCollectionProcessor would have already created this path
- // as part of the create collection API call -- which is the only way in which a collection
- // with stateFormat > 1 can possibly be created
- continue;
- }
- zkClient.create(e.getKey(), data, CreateMode.PERSISTENT, true);
- }
- }
- }
- updateNodes.clear();
- }
-
- if (isClusterStateModified) {
- lastUpdatedTime = System.nanoTime();
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(clusterState), true);
- isClusterStateModified = false;
- }
- success = true;
+ zkWriteCommand = processMessage(clusterState, message, operation);
+ stats.success(operation);
+ } catch (Exception e) {
+ // generally there is nothing we can do - in most cases, we have
+ // an issue that will fail again on retry or we cannot communicate with a
+ // ZooKeeper in which case another Overseer should take over
+ // TODO: if ordering for the message is not important, we could
+ // track retries and put it back on the end of the queue
+ log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
+ stats.error(operation);
} finally {
timerContext.stop();
- if (success) {
- stats.success("update_state");
- } else {
- stats.error("update_state");
- }
}
+ if (zkWriteCommand != null) {
+ clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
+ }
+ return clusterState;
}
private void checkIfIamStillLeader() {
@@ -460,41 +362,32 @@ public class Overseer implements Closeab
}
}
- private ClusterState processMessage(ClusterState clusterState,
+ private ZkWriteCommand processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation) {
-
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
if (collectionAction != null) {
switch (collectionAction) {
case CREATE:
- clusterState = buildCollection(clusterState, message);
- break;
+ return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
case DELETE:
- clusterState = removeCollection(clusterState, message);
- break;
+ return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
case CREATESHARD:
- clusterState = createShard(clusterState, message);
- break;
+ return new CollectionMutator(getZkStateReader()).createShard(clusterState, message);
case DELETESHARD:
- clusterState = removeShard(clusterState, message);
- break;
+ return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
case ADDREPLICA:
- clusterState = createReplica(clusterState, message);
- break;
+ return new SliceMutator(getZkStateReader()).addReplica(clusterState, message);
case CLUSTERPROP:
handleProp(message);
- break;
case ADDREPLICAPROP:
- clusterState = addReplicaProp(clusterState, message);
- break;
+ return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message);
case DELETEREPLICAPROP:
- clusterState = deleteReplicaProp(clusterState, message);
- break;
+ return new ReplicaMutator(getZkStateReader()).removeReplicaProperty(clusterState, message);
case BALANCESHARDUNIQUE:
- ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(this, clusterState, message);
+ ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
if (dProp.balanceProperty()) {
String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
- clusterState = newState(clusterState, singletonMap(collName, dProp.getDocCollection()));
+ return new ZkWriteCommand(collName, dProp.getDocCollection());
}
break;
default:
@@ -506,27 +399,17 @@ public class Overseer implements Closeab
if (overseerAction != null) {
switch (overseerAction) {
case STATE:
- if (isLegacy(clusterProps)) {
- clusterState = updateState(clusterState, message);
- } else {
- clusterState = updateStateNew(clusterState, message);
- }
- break;
+ return new ReplicaMutator(getZkStateReader()).setState(clusterState, message);
case LEADER:
- clusterState = setShardLeader(clusterState, message);
- break;
+ return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message);
case DELETECORE:
- clusterState = removeCore(clusterState, message);
- break;
+ return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message);
case ADDROUTINGRULE:
- clusterState = addRoutingRule(clusterState, message);
- break;
+ return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message);
case REMOVEROUTINGRULE:
- clusterState = removeRoutingRule(clusterState, message);
- break;
+ return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message);
case UPDATESHARDSTATE:
- clusterState = updateShardState(clusterState, message);
- break;
+ return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message);
case QUIT:
if (myId.equals(message.get("id"))) {
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
@@ -545,14 +428,11 @@ public class Overseer implements Closeab
// specified in CollectionAction. See SOLR-6115. Remove this in 5.0
switch (operation) {
case OverseerCollectionProcessor.CREATECOLLECTION:
- clusterState = buildCollection(clusterState, message);
- break;
+ return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
case REMOVECOLLECTION:
- clusterState = removeCollection(clusterState, message);
- break;
+ return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
case REMOVESHARD:
- clusterState = removeShard(clusterState, message);
- break;
+ return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
default:
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
@@ -560,125 +440,7 @@ public class Overseer implements Closeab
}
}
- return clusterState;
- }
-
- private ClusterState addReplicaProp(ClusterState clusterState, ZkNodeProps message) {
-
- if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Overseer SETREPLICAPROPERTY requires " +
- ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
- ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " and " +
- ZkStateReader.PROPERTY_VALUE_PROP + " no action taken.");
- }
-
- String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
- String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
- String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
- String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
- if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
- property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
- }
- property = property.toLowerCase(Locale.ROOT);
- String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
- String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE);
-
- boolean isUnique = false;
-
- if (sliceUniqueBooleanProperties.contains(property)) {
- if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer SETREPLICAPROPERTY for " +
- property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" +
- "'true'. No action taken");
- }
- isUnique = true;
- } else {
- isUnique = Boolean.parseBoolean(shardUnique);
- }
-
- Replica replica = clusterState.getReplica(collectionName, replicaName);
-
- if (replica == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
- collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
- }
- log.info("Setting property " + property + " with value: " + propVal +
- " for collection: " + collectionName + ". Full message: " + message);
- if (StringUtils.equalsIgnoreCase(replica.getStr(property), propVal)) return clusterState; // already the value we're going to set
-
- // OK, there's no way we won't change the cluster state now
- Map<String,Replica> replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy();
- if (isUnique == false) {
- replicas.get(replicaName).getProperties().put(property, propVal);
- } else { // Set prop for this replica, but remove it for all others.
- for (Replica rep : replicas.values()) {
- if (rep.getName().equalsIgnoreCase(replicaName)) {
- rep.getProperties().put(property, propVal);
- } else {
- rep.getProperties().remove(property);
- }
- }
- }
- Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy());
- return updateSlice(clusterState, collectionName, newSlice);
- }
-
- private ClusterState deleteReplicaProp(ClusterState clusterState, ZkNodeProps message) {
-
- if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Overseer DELETEREPLICAPROPERTY requires " +
- ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
- ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " no action taken.");
- }
- String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
- String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
- String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
- String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
- if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
- property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
- }
-
- Replica replica = clusterState.getReplica(collectionName, replicaName);
-
- if (replica == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
- collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
- }
-
- log.info("Deleting property " + property + " for collection: " + collectionName +
- " slice " + sliceName + " replica " + replicaName + ". Full message: " + message);
- String curProp = replica.getStr(property);
- if (curProp == null) return clusterState; // not there anyway, nothing to do.
-
- Map<String, Replica> replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy();
- replica = replicas.get(replicaName);
- replica.getProperties().remove(property);
- Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy());
- return updateSlice(clusterState, collectionName, newSlice);
- }
-
- private ClusterState setShardLeader(ClusterState clusterState, ZkNodeProps message) {
- StringBuilder sb = new StringBuilder();
- String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP);
- String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP);
- sb.append(baseUrl);
- if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/");
- sb.append(coreName == null ? "" : coreName);
- if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
- clusterState = setShardLeader(clusterState,
- message.getStr(ZkStateReader.COLLECTION_PROP),
- message.getStr(ZkStateReader.SHARD_ID_PROP),
- sb.length() > 0 ? sb.toString() : null);
- return clusterState;
+ return ZkStateWriter.NO_OP;
}
private void handleProp(ZkNodeProps message) {
@@ -700,179 +462,6 @@ public class Overseer implements Closeab
}
}
- private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
- log.info("createReplica() {} ", message);
- String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
- DocCollection collection = clusterState.getCollection(coll);
- Slice sl = collection.getSlice(slice);
- if(sl == null){
- log.error("Invalid Collection/Slice {}/{} ",coll,slice);
- return clusterState;
- }
-
- String coreNodeName = Assign.assignNode(coll, clusterState);
- Replica replica = new Replica(coreNodeName,
- makeMap(
- ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
- ZkStateReader.BASE_URL_PROP,message.getStr(ZkStateReader.BASE_URL_PROP),
- ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP)));
- sl.getReplicasMap().put(coreNodeName, replica);
- return newState(clusterState, singletonMap(coll, collection));
- }
-
- private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) {
- String collection = message.getStr("name");
- log.info("building a new collection: " + collection);
- if(clusterState.hasCollection(collection) ){
- log.warn("Collection {} already exists. exit" ,collection);
- return clusterState;
- }
-
- ArrayList<String> shardNames = new ArrayList<>();
-
- if(ImplicitDocRouter.NAME.equals( message.getStr("router.name",DocRouter.DEFAULT_NAME))){
- getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME));
- } else {
- int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
- if(numShards<1) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"numShards is a required parameter for 'compositeId' router");
- getShardNames(numShards, shardNames);
- }
-
- return createCollection(clusterState, collection, shardNames, message);
- }
-
- private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
- String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
- for (String key : message.keySet()) {
- if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
- if (QUEUE_OPERATION.equals(key)) continue;
-
- Slice slice = clusterState.getSlice(collection, key);
- if (slice == null) {
- throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key);
- }
- log.info("Update shard state " + key + " to " + message.getStr(key));
- Map<String, Object> props = slice.shallowCopy();
- if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) {
- props.remove(Slice.PARENT);
- }
- props.put(Slice.STATE, message.getStr(key));
- Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
- clusterState = updateSlice(clusterState, collection, newSlice);
- }
-
- return clusterState;
- }
-
- private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
- String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
- String routeKey = message.getStr("routeKey");
- String range = message.getStr("range");
- String targetCollection = message.getStr("targetCollection");
- String targetShard = message.getStr("targetShard");
- String expireAt = message.getStr("expireAt");
-
- Slice slice = clusterState.getSlice(collection, shard);
- if (slice == null) {
- throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collection + " slice:" + shard);
- }
-
- Map<String, RoutingRule> routingRules = slice.getRoutingRules();
- if (routingRules == null)
- routingRules = new HashMap<>();
- RoutingRule r = routingRules.get(routeKey);
- if (r == null) {
- Map<String, Object> map = new HashMap<>();
- map.put("routeRanges", range);
- map.put("targetCollection", targetCollection);
- map.put("expireAt", expireAt);
- RoutingRule rule = new RoutingRule(routeKey, map);
- routingRules.put(routeKey, rule);
- } else {
- // add this range
- Map<String, Object> map = r.shallowCopy();
- map.put("routeRanges", map.get("routeRanges") + "," + range);
- map.put("expireAt", expireAt);
- routingRules.put(routeKey, new RoutingRule(routeKey, map));
- }
-
- Map<String, Object> props = slice.shallowCopy();
- props.put("routingRules", routingRules);
-
- Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
- clusterState = updateSlice(clusterState, collection, newSlice);
- return clusterState;
- }
-
- private boolean checkCollectionKeyExistence(ZkNodeProps message) {
- return checkKeyExistence(message, ZkStateReader.COLLECTION_PROP);
- }
-
- private boolean checkKeyExistence(ZkNodeProps message, String key) {
- String value = message.getStr(key);
- if (value == null || value.trim().length() == 0) {
- log.error("Skipping invalid Overseer message because it has no " + key + " specified: " + message);
- return false;
- }
- return true;
- }
-
- private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
- String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
- String routeKeyStr = message.getStr("routeKey");
-
- log.info("Overseer.removeRoutingRule invoked for collection: " + collection
- + " shard: " + shard + " routeKey: " + routeKeyStr);
-
- Slice slice = clusterState.getSlice(collection, shard);
- if (slice == null) {
- log.warn("Unknown collection: " + collection + " shard: " + shard);
- return clusterState;
- }
- Map<String, RoutingRule> routingRules = slice.getRoutingRules();
- if (routingRules != null) {
- routingRules.remove(routeKeyStr); // no rules left
- Map<String, Object> props = slice.shallowCopy();
- props.put("routingRules", routingRules);
- Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
- clusterState = updateSlice(clusterState, collection, newSlice);
- }
-
- return clusterState;
- }
-
- private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
- String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
- Slice slice = clusterState.getSlice(collection, shardId);
- if (slice == null) {
- Map<String, Replica> replicas = Collections.EMPTY_MAP;
- Map<String, Object> sliceProps = new HashMap<>();
- String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
- String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
- String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP);
- sliceProps.put(Slice.RANGE, shardRange);
- sliceProps.put(Slice.STATE, shardState);
- if (shardParent != null) {
- sliceProps.put(Slice.PARENT, shardParent);
- }
- slice = new Slice(shardId, replicas, sliceProps);
- clusterState = updateSlice(clusterState, collection, slice);
- } else {
- log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collection);
- }
- return clusterState;
- }
-
private LeaderStatus amILeader() {
TimerContext timerContext = stats.time("am_i_leader");
boolean success = true;
@@ -907,525 +496,7 @@ public class Overseer implements Closeab
return LeaderStatus.NO;
}
- private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) {
- String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-
- if(collection==null || sliceName == null){
- log.error("Invalid collection and slice {}", message);
- return clusterState;
- }
- Slice slice = clusterState.getSlice(collection, sliceName);
- if(slice == null){
- log.error("No such slice exists {}", message);
- return clusterState;
- }
-
- return updateState(clusterState, message);
- }
-
- /**
- * Try to assign core to the cluster.
- */
- private ClusterState updateState(ClusterState clusterState, final ZkNodeProps message) {
- final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
- Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
- log.info("Update state numShards={} message={}", numShards, message);
-
- List<String> shardNames = new ArrayList<>();
-
- //collection does not yet exist, create placeholders if num shards is specified
- boolean collectionExists = clusterState.hasCollection(collection);
- if (!collectionExists && numShards!=null) {
- getShardNames(numShards, shardNames);
- clusterState = createCollection(clusterState, collection, shardNames, message);
- }
- String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-
- String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
- if (coreNodeName == null) {
- coreNodeName = getAssignedCoreNodeName(clusterState, message);
- if (coreNodeName != null) {
- log.info("node=" + coreNodeName + " is already registered");
- } else {
- // if coreNodeName is null, auto assign one
- coreNodeName = Assign.assignNode(collection, clusterState);
- }
- message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
- coreNodeName);
- }
-
- // use the provided non null shardId
- if (sliceName == null) {
- //get shardId from ClusterState
- sliceName = getAssignedId(clusterState, coreNodeName, message);
- if (sliceName != null) {
- log.info("shard=" + sliceName + " is already registered");
- }
- }
- if(sliceName == null) {
- //request new shardId
- if (collectionExists) {
- // use existing numShards
- numShards = clusterState.getCollection(collection).getSlices().size();
- log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
- }
- sliceName = Assign.assignShard(collection, clusterState, numShards);
- log.info("Assigning new node to shard shard=" + sliceName);
- }
-
- Slice slice = clusterState.getSlice(collection, sliceName);
-
- Map<String,Object> replicaProps = new LinkedHashMap<>();
-
- replicaProps.putAll(message.getProperties());
- // System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
- if (slice != null) {
- Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
- if (oldReplica != null) {
- if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
- replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
- }
- // Move custom props over.
- for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
- if (ent.getKey().startsWith(COLL_PROP_PREFIX)) {
- replicaProps.put(ent.getKey(), ent.getValue());
- }
- }
- }
- }
-
- // we don't put these in the clusterstate
- replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
- replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
- replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
- replicaProps.remove(ZkStateReader.COLLECTION_PROP);
- replicaProps.remove(QUEUE_OPERATION);
-
- // remove any props with null values
- Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
- List<String> removeKeys = new ArrayList<>();
- for (Entry<String,Object> entry : entrySet) {
- if (entry.getValue() == null) {
- removeKeys.add(entry.getKey());
- }
- }
- for (String removeKey : removeKeys) {
- replicaProps.remove(removeKey);
- }
- replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
- // remove shard specific properties
- String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
- String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
- String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP);
-
-
- Replica replica = new Replica(coreNodeName, replicaProps);
-
- // TODO: where do we get slice properties in this message? or should there be a separate create-slice message if we want that?
-
- Map<String,Object> sliceProps = null;
- Map<String,Replica> replicas;
-
- if (slice != null) {
- clusterState = checkAndCompleteShardSplit(clusterState, collection, coreNodeName, sliceName, replicaProps);
- // get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
- slice = clusterState.getSlice(collection, sliceName);
- sliceProps = slice.getProperties();
- replicas = slice.getReplicasCopy();
- } else {
- replicas = new HashMap<>(1);
- sliceProps = new HashMap<>();
- sliceProps.put(Slice.RANGE, shardRange);
- sliceProps.put(Slice.STATE, shardState);
- sliceProps.put(Slice.PARENT, shardParent);
- }
-
- replicas.put(replica.getName(), replica);
- slice = new Slice(sliceName, replicas, sliceProps);
-
- ClusterState newClusterState = updateSlice(clusterState, collection, slice);
- return newClusterState;
- }
-
-
-
- private ClusterState checkAndCompleteShardSplit(ClusterState state, String collection, String coreNodeName, String sliceName, Map<String,Object> replicaProps) {
- Slice slice = state.getSlice(collection, sliceName);
- Map<String, Object> sliceProps = slice.getProperties();
- String sliceState = slice.getState();
- if (Slice.RECOVERY.equals(sliceState)) {
- log.info("Shard: {} is in recovery state", sliceName);
- // is this replica active?
- if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) {
- log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
- // are all other replicas also active?
- boolean allActive = true;
- for (Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
- if (coreNodeName.equals(entry.getKey())) continue;
- if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) {
- allActive = false;
- break;
- }
- }
- if (allActive) {
- log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName);
- // find out about other sub shards
- Map<String, Slice> allSlicesCopy = new HashMap<>(state.getSlicesMap(collection));
- List<Slice> subShardSlices = new ArrayList<>();
- outer:
- for (Entry<String, Slice> entry : allSlicesCopy.entrySet()) {
- if (sliceName.equals(entry.getKey()))
- continue;
- Slice otherSlice = entry.getValue();
- if (Slice.RECOVERY.equals(otherSlice.getState())) {
- if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) {
- log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
- // this is a fellow sub shard so check if all replicas are active
- for (Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
- if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) {
- allActive = false;
- break outer;
- }
- }
- log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName());
- subShardSlices.add(otherSlice);
- }
- }
- }
- if (allActive) {
- // hurray, all sub shard replicas are active
- log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName);
- String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
-
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
- propMap.put(parentSliceName, Slice.INACTIVE);
- propMap.put(sliceName, Slice.ACTIVE);
- for (Slice subShardSlice : subShardSlices) {
- propMap.put(subShardSlice.getName(), Slice.ACTIVE);
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collection);
- ZkNodeProps m = new ZkNodeProps(propMap);
- state = updateShardState(state, m);
- }
- }
- }
- }
- return state;
- }
-
- private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
- log.info("Create collection {} with shards {}", collectionName, shards);
-
- Map<String, Object> routerSpec = DocRouter.getRouterSpec(message);
- String routerName = routerSpec.get("name") == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get("name");
- DocRouter router = DocRouter.getDocRouter(routerName);
-
- List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
-
-
-
- Map<String, Slice> newSlices = new LinkedHashMap<>();
-
- for (int i = 0; i < shards.size(); i++) {
- String sliceName = shards.get(i);
-
- Map<String, Object> sliceProps = new LinkedHashMap<>(1);
- sliceProps.put(Slice.RANGE, ranges == null? null: ranges.get(i));
-
- newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
- }
-
- // TODO: fill in with collection properties read from the /collections/<collectionName> node
- Map<String,Object> collectionProps = new HashMap<>();
-
- for (Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
- Object val = message.get(e.getKey());
- if(val == null){
- val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
- }
- if(val != null) collectionProps.put(e.getKey(),val);
- }
- collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
-
- if (message.getStr("fromApi") == null) {
- collectionProps.put("autoCreated", "true");
- }
-
- String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
- : ZkStateReader.getCollectionPath(collectionName);
-
- DocCollection newCollection = new DocCollection(collectionName,
- newSlices, collectionProps, router, -1, znode);
-
- isClusterStateModified = true;
-
- log.info("state version {} {}", collectionName, newCollection.getStateFormat());
-
- return newState(state, singletonMap(newCollection.getName(), newCollection));
- }
-
- /*
- * Return an already assigned id or null if not assigned
- */
- private String getAssignedId(final ClusterState state, final String nodeName,
- final ZkNodeProps coreState) {
- Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
- if (slices != null) {
- for (Slice slice : slices) {
- if (slice.getReplicasMap().get(nodeName) != null) {
- return slice.getName();
- }
- }
- }
- return null;
- }
-
- private String getAssignedCoreNodeName(ClusterState state, ZkNodeProps message) {
- Collection<Slice> slices = state.getSlices(message.getStr(ZkStateReader.COLLECTION_PROP));
- if (slices != null) {
- for (Slice slice : slices) {
- for (Replica replica : slice.getReplicas()) {
- String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
- String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String msgNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
- String msgCore = message.getStr(ZkStateReader.CORE_NAME_PROP);
-
- if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
- return replica.getName();
- }
- }
- }
- }
- return null;
- }
-
- private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
- // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
- // System.out.println("Updating slice:" + slice);
- DocCollection newCollection = null;
- DocCollection coll = state.getCollectionOrNull(collectionName) ;
- Map<String,Slice> slices;
-
- if (coll == null) {
- // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
- // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
- slices = new LinkedHashMap<>(1);
- slices.put(slice.getName(), slice);
- Map<String,Object> props = new HashMap<>(1);
- props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
- newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
- } else {
- slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
- slices.put(slice.getName(), slice);
- newCollection = coll.copyWithSlices(slices);
- }
-
- // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
-
- return newState(state, singletonMap(collectionName, newCollection));
- }
-
- private ClusterState setShardLeader(ClusterState state, String collectionName, String sliceName, String leaderUrl) {
- DocCollection coll = state.getCollectionOrNull(collectionName);
-
- if(coll == null) {
- log.error("Could not mark shard leader for non existing collection:" + collectionName);
- return state;
- }
-
- Map<String, Slice> slices = coll.getSlicesMap();
- // make a shallow copy and add it to the new collection
- slices = new LinkedHashMap<>(slices);
-
- Slice slice = slices.get(sliceName);
- if (slice == null) {
- slice = coll.getSlice(sliceName);
- }
-
- if (slice == null) {
- log.error("Could not mark leader for non existing/active slice:" + sliceName);
- return state;
- } else {
- // TODO: consider just putting the leader property on the shard, not on individual replicas
-
- Replica oldLeader = slice.getLeader();
-
- final Map<String,Replica> newReplicas = new LinkedHashMap<>();
-
- for (Replica replica : slice.getReplicas()) {
-
- // TODO: this should only be calculated once and cached somewhere?
- String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
- if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
- Map<String,Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
- replicaProps.remove(Slice.LEADER);
- replica = new Replica(replica.getName(), replicaProps);
- } else if (coreURL.equals(leaderUrl)) {
- Map<String,Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
- replicaProps.put(Slice.LEADER, "true"); // TODO: allow booleans instead of strings
- replica = new Replica(replica.getName(), replicaProps);
- }
-
- newReplicas.put(replica.getName(), replica);
- }
-
- Map<String,Object> newSliceProps = slice.shallowCopy();
- newSliceProps.put(Slice.REPLICAS, newReplicas);
- Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
- slices.put(newSlice.getName(), newSlice);
- }
-
-
- DocCollection newCollection = coll.copyWithSlices(slices);
- return newState(state, singletonMap(collectionName, newCollection));
- }
-
- private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
- for (Entry<String, DocCollection> e : colls.entrySet()) {
- DocCollection c = e.getValue();
- if (c == null) {
- isClusterStateModified = true;
- state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
- updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null);
- continue;
- }
-
- if (c.getStateFormat() > 1) {
- updateNodes.put(ZkStateReader.getCollectionPath(c.getName()),
- new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
- } else {
- isClusterStateModified = true;
- }
- state = state.copyWith(singletonMap(e.getKey(), c));
-
- }
- return state;
- }
-
- /*
- * Remove collection from cloudstate
- */
- private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
- final String collection = message.getStr("name");
- if (!checkKeyExistence(message, "name")) return clusterState;
- DocCollection coll = clusterState.getCollectionOrNull(collection);
- if(coll == null) return clusterState;
-
- isClusterStateModified = true;
- if (coll.getStateFormat() > 1) {
- try {
- log.info("Deleting state for collection : {}", collection);
- zkClient.delete(ZkStateReader.getCollectionPath(collection), -1, true);
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to remove collection state :" + collection);
- }
- }
- return newState(clusterState, singletonMap(coll.getName(),(DocCollection) null));
- }
- /*
- * Remove collection slice from cloudstate
- */
- private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
- final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
- final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
-
- log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
-
- DocCollection coll = clusterState.getCollection(collection);
-
- Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
- newSlices.remove(sliceId);
-
- DocCollection newCollection = coll.copyWithSlices(newSlices);
- return newState(clusterState, singletonMap(collection,newCollection));
- }
-
- /*
- * Remove core from cloudstate
- */
- private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
- final String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
- final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- if (!checkCollectionKeyExistence(message)) return clusterState;
-
- DocCollection coll = clusterState.getCollectionOrNull(collection) ;
- if (coll == null) {
- // TODO: log/error that we didn't find it?
- // just in case, remove the zk collection node
- try {
- zkClient.clean("/collections/" + collection);
- } catch (InterruptedException e) {
- SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
- Thread.currentThread().interrupt();
- } catch (KeeperException e) {
- SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
- }
- return clusterState;
- }
-
- Map<String, Slice> newSlices = new LinkedHashMap<>();
- boolean lastSlice = false;
- for (Slice slice : coll.getSlices()) {
- Replica replica = slice.getReplica(cnn);
- if (replica != null) {
- Map<String, Replica> newReplicas = slice.getReplicasCopy();
- newReplicas.remove(cnn);
- // TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
- // if (newReplicas.size() == 0 && slice.getRange() == null) {
- // if there are no replicas left for the slice remove it
- if (newReplicas.size() == 0) {
- slice = null;
- lastSlice = true;
- } else {
- slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
- }
- }
-
- if (slice != null) {
- newSlices.put(slice.getName(), slice);
- }
- }
-
- if (lastSlice) {
- // remove all empty pre allocated slices
- for (Slice slice : coll.getSlices()) {
- if (slice.getReplicas().size() == 0) {
- newSlices.remove(slice.getName());
- }
- }
- }
-
- // if there are no slices left in the collection, remove it?
- if (newSlices.size() == 0) {
-
- // TODO: it might be better logically to have this in ZkController
- // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
- // ZkController out of the Overseer.
- try {
- zkClient.clean("/collections/" + collection);
- } catch (InterruptedException e) {
- SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
- Thread.currentThread().interrupt();
- } catch (KeeperException e) {
- SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
- }
- return newState(clusterState,singletonMap(collection, (DocCollection) null));
-
- } else {
- DocCollection newCollection = coll.copyWithSlices(newSlices);
- return newState(clusterState,singletonMap(collection,newCollection));
- }
-
- }
-
- @Override
+ @Override
public void close() {
this.isClosed = true;
}
@@ -1433,7 +504,6 @@ public class Overseer implements Closeab
}
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
private class ExclusiveSliceProperty {
- private ClusterStateUpdater updater;
private ClusterState clusterState;
private final boolean onlyActiveNodes;
private final String property;
@@ -1455,8 +525,7 @@ public class Overseer implements Closeab
private int assigned = 0;
- ExclusiveSliceProperty(ClusterStateUpdater updater, ClusterState clusterState, ZkNodeProps message) {
- this.updater = updater;
+ ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
this.clusterState = clusterState;
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
if (StringUtils.startsWith(tmp, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
@@ -1473,7 +542,7 @@ public class Overseer implements Closeab
Boolean shardUnique = Boolean.parseBoolean(message.getStr(SHARD_UNIQUE));
if (shardUnique == false &&
- Overseer.sliceUniqueBooleanProperties.contains(this.property) == false) {
+ SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(this.property) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
+ " the property be a pre-defined property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true' " +
" Property: " + this.property + " shardUnique: " + Boolean.toString(shardUnique));
@@ -1717,7 +786,8 @@ public class Overseer implements Closeab
balanceUnassignedReplicas();
for (Slice newSlice : changedSlices.values()) {
- clusterState = updater.updateSlice(clusterState, collectionName, newSlice);
+ DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice);
+ clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection);
}
return true;
}
@@ -1733,28 +803,6 @@ public class Overseer implements Closeab
}
}
- static void getShardNames(Integer numShards, List<String> shardNames) {
- if(numShards == null)
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");
- for (int i = 0; i < numShards; i++) {
- final String sliceName = "shard" + (i + 1);
- shardNames.add(sliceName);
- }
-
- }
-
- static void getShardNames(List<String> shardNames, String shards) {
- if(shards ==null)
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
- for (String s : shards.split(",")) {
- if(s ==null || s.trim().isEmpty()) continue;
- shardNames.add(s.trim());
- }
- if(shardNames.isEmpty())
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
-
- }
-
class OverseerThread extends Thread implements Closeable {
protected volatile boolean isClosed;
@@ -1846,7 +894,10 @@ public class Overseer implements Closeab
ccThread.start();
arfoThread.start();
}
-
+
+ public Stats getStats() {
+ return stats;
+ }
/**
* For tests.
@@ -1964,6 +1015,7 @@ public class Overseer implements Closeab
static final int MAX_STORED_FAILURES = 10;
final Map<String, Stat> stats = new ConcurrentHashMap<>();
+ private volatile int queueLength;
public Map<String, Stat> getStats() {
return stats;
@@ -2034,6 +1086,14 @@ public class Overseer implements Closeab
return ret;
}
}
+
+ public int getQueueLength() {
+ return queueLength;
+ }
+
+ public void setQueueLength(int queueLength) {
+ this.queueLength = queueLength;
+ }
}
public static class Stat {
Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Dec 1 17:25:39 2014
@@ -68,6 +68,8 @@ import org.apache.solr.client.solrj.resp
import org.apache.solr.cloud.Assign.Node;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
@@ -467,7 +469,7 @@ public class OverseerCollectionProcessor
}
//now ask the current leader to QUIT , so that the designate can takeover
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
- ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.QUIT.toLower(),
+ ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
"id",getLeaderId(zkStateReader.getZkClient()))));
}
@@ -698,7 +700,7 @@ public class OverseerCollectionProcessor
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower());
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, shardId);
propMap.put(BASE_URL_PROP, baseURL);
@@ -1148,7 +1150,7 @@ public class OverseerCollectionProcessor
private void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
ZkNodeProps m = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, Overseer.OverseerAction.DELETECORE.toLower(),
+ Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, core,
ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.COLLECTION_PROP, collectionName,
@@ -1760,7 +1762,7 @@ public class OverseerCollectionProcessor
log.info("Replication factor is 1 so switching shard states");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(slice, Slice.INACTIVE);
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.ACTIVE);
@@ -1772,7 +1774,7 @@ public class OverseerCollectionProcessor
log.info("Requesting shard state be set to 'recovery'");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.RECOVERY);
}
@@ -2062,7 +2064,7 @@ public class OverseerCollectionProcessor
completeAsyncRequest(asyncId, requestMap, results);
ZkNodeProps m = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, Overseer.OverseerAction.ADDROUTINGRULE.toLower(),
+ Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
COLLECTION_PROP, sourceCollection.getName(),
SHARD_ID_PROP, sourceSlice.getName(),
"routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
@@ -2315,10 +2317,10 @@ public class OverseerCollectionProcessor
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
if(ImplicitDocRouter.NAME.equals(router)){
- Overseer.getShardNames(shardNames, message.getStr("shards",null));
+ ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
numSlices = shardNames.size();
} else {
- Overseer.getShardNames(numSlices,shardNames);
+ ClusterStateMutator.getShardNames(numSlices, shardNames);
}
if (numSlices == null ) {