You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/23 10:30:51 UTC
[21/41] lucene-solr:jira/solr-11702: SOLR-11817: Move Collections API
classes to it's own package
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
deleted file mode 100644
index 426c879..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ /dev/null
@@ -1,1003 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang.StringUtils;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrCloseable;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionAdminParams;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.SuppressForbidden;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-import org.apache.solr.handler.component.ShardRequest;
-import org.apache.solr.handler.component.ShardResponse;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.solr.util.RTimer;
-import org.apache.solr.util.TimeOut;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
-
-/**
- * A {@link OverseerMessageHandler} that handles Collections API related
- * overseer messages.
- */
-public class OverseerCollectionMessageHandler implements OverseerMessageHandler, SolrCloseable {
-
- public static final String NUM_SLICES = "numShards";
-
- static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
- public static final String CREATE_NODE_SET_SHUFFLE = CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
- public static final String CREATE_NODE_SET_EMPTY = "EMPTY";
- public static final String CREATE_NODE_SET = CollectionAdminParams.CREATE_NODE_SET_PARAM;
-
- public static final String ROUTER = "router";
-
- public static final String SHARDS_PROP = "shards";
-
- public static final String REQUESTID = "requestid";
-
- public static final String COLL_CONF = "collection.configName";
-
- public static final String COLL_PROP_PREFIX = "property.";
-
- public static final String ONLY_IF_DOWN = "onlyIfDown";
-
- public static final String SHARD_UNIQUE = "shardUnique";
-
- public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
-
- static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
-
- public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
- ROUTER, DocRouter.DEFAULT_NAME,
- ZkStateReader.REPLICATION_FACTOR, "1",
- ZkStateReader.NRT_REPLICAS, "1",
- ZkStateReader.TLOG_REPLICAS, "0",
- ZkStateReader.PULL_REPLICAS, "0",
- ZkStateReader.MAX_SHARDS_PER_NODE, "1",
- ZkStateReader.AUTO_ADD_REPLICAS, "false",
- DocCollection.RULE, null,
- POLICY, null,
- SNITCH, null));
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- Overseer overseer;
- ShardHandlerFactory shardHandlerFactory;
- String adminPath;
- ZkStateReader zkStateReader;
- SolrCloudManager cloudManager;
- String myId;
- Stats stats;
- TimeSource timeSource;
-
- // Set that tracks collections that are currently being processed by a running task.
- // This is used for handling mutual exclusion of the tasks.
-
- final private LockTree lockTree = new LockTree();
- ExecutorService tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
- new SynchronousQueue<>(),
- new DefaultSolrThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
-
- static final Random RANDOM;
- static {
- // We try to make things reproducible in the context of our tests by initializing the random instance
- // based on the current seed
- String seed = System.getProperty("tests.seed");
- if (seed == null) {
- RANDOM = new Random();
- } else {
- RANDOM = new Random(seed.hashCode());
- }
- }
-
- final Map<CollectionAction, Cmd> commandMap;
-
- private volatile boolean isClosed;
-
- public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
- final ShardHandlerFactory shardHandlerFactory,
- String adminPath,
- Stats stats,
- Overseer overseer,
- OverseerNodePrioritizer overseerPrioritizer) {
- this.zkStateReader = zkStateReader;
- this.shardHandlerFactory = shardHandlerFactory;
- this.adminPath = adminPath;
- this.myId = myId;
- this.stats = stats;
- this.overseer = overseer;
- this.cloudManager = overseer.getSolrCloudManager();
- this.timeSource = cloudManager.getTimeSource();
- this.isClosed = false;
- commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
- .put(REPLACENODE, new ReplaceNodeCmd(this))
- .put(DELETENODE, new DeleteNodeCmd(this))
- .put(BACKUP, new BackupCmd(this))
- .put(RESTORE, new RestoreCmd(this))
- .put(CREATESNAPSHOT, new CreateSnapshotCmd(this))
- .put(DELETESNAPSHOT, new DeleteSnapshotCmd(this))
- .put(SPLITSHARD, new SplitShardCmd(this))
- .put(ADDROLE, new OverseerRoleCmd(this, ADDROLE, overseerPrioritizer))
- .put(REMOVEROLE, new OverseerRoleCmd(this, REMOVEROLE, overseerPrioritizer))
- .put(MOCK_COLL_TASK, this::mockOperation)
- .put(MOCK_SHARD_TASK, this::mockOperation)
- .put(MOCK_REPLICA_TASK, this::mockOperation)
- .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
- .put(CREATESHARD, new CreateShardCmd(this))
- .put(MIGRATE, new MigrateCmd(this))
- .put(CREATE, new CreateCollectionCmd(this))
- .put(MODIFYCOLLECTION, this::modifyCollection)
- .put(ADDREPLICAPROP, this::processReplicaAddPropertyCommand)
- .put(DELETEREPLICAPROP, this::processReplicaDeletePropertyCommand)
- .put(BALANCESHARDUNIQUE, this::balanceProperty)
- .put(REBALANCELEADERS, this::processRebalanceLeaders)
- .put(RELOAD, this::reloadCollection)
- .put(DELETE, new DeleteCollectionCmd(this))
- .put(CREATEALIAS, new CreateAliasCmd(this))
- .put(DELETEALIAS, new DeleteAliasCmd(this))
- .put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this))
- .put(OVERSEERSTATUS, new OverseerStatusCmd(this))
- .put(DELETESHARD, new DeleteShardCmd(this))
- .put(DELETEREPLICA, new DeleteReplicaCmd(this))
- .put(ADDREPLICA, new AddReplicaCmd(this))
- .put(MOVEREPLICA, new MoveReplicaCmd(this))
- .put(UTILIZENODE, new UtilizeNodeCmd(this))
- .build()
- ;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public SolrResponse processMessage(ZkNodeProps message, String operation) {
- log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
-
- NamedList results = new NamedList();
- try {
- CollectionAction action = getCollectionAction(operation);
- Cmd command = commandMap.get(action);
- if (command != null) {
- command.call(cloudManager.getClusterStateProvider().getClusterState(), message, results);
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
- + operation);
- }
- } catch (Exception e) {
- String collName = message.getStr("collection");
- if (collName == null) collName = message.getStr(NAME);
-
- if (collName == null) {
- SolrException.log(log, "Operation " + operation + " failed", e);
- } else {
- SolrException.log(log, "Collection: " + collName + " operation: " + operation
- + " failed", e);
- }
-
- results.add("Operation " + operation + " caused exception:", e);
- SimpleOrderedMap nl = new SimpleOrderedMap();
- nl.add("msg", e.getMessage());
- nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
- results.add("exception", nl);
- }
- return new OverseerSolrResponse(results);
- }
-
- @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
- private void mockOperation(ClusterState state, ZkNodeProps message, NamedList results) throws InterruptedException {
- //only for test purposes
- Thread.sleep(message.getInt("sleep", 1));
- log.info("MOCK_TASK_EXECUTED time {} data {}", System.currentTimeMillis(), Utils.toJSONString(message));
- results.add("MOCK_FINISHED", System.currentTimeMillis());
- }
-
- private CollectionAction getCollectionAction(String operation) {
- CollectionAction action = CollectionAction.get(operation);
- if (action == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
- }
- return action;
- }
-
- private void reloadCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
-
- String asyncId = message.getStr(ASYNC);
- Map<String, String> requestMap = null;
- if (asyncId != null) {
- requestMap = new HashMap<>();
- }
- collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
- }
-
- @SuppressWarnings("unchecked")
- private void processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, NamedList results)
- throws Exception {
- checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
- CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
- params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
- params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
- params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
- params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
- params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
- params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
- params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
-
- String baseUrl = message.getStr(BASE_URL_PROP);
- ShardRequest sreq = new ShardRequest();
- sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
- // yes, they must use same admin handler path everywhere...
- params.set("qt", adminPath);
- sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
- sreq.shards = new String[] {baseUrl};
- sreq.actualShards = sreq.shards;
- sreq.params = params;
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
- shardHandler.submit(sreq, baseUrl, sreq.params);
- }
-
- @SuppressWarnings("unchecked")
- private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
- throws Exception {
- checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
- SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
- propMap.putAll(message.getProperties());
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
- }
-
- private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
- throws Exception {
- checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
- SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
- propMap.putAll(message.getProperties());
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
- }
-
- private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
- if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
- "' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
- }
- SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
- propMap.putAll(message.getProperties());
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
- }
-
- /**
- * Walks the tree of collection status to verify that any replicas not reporting a "down" status is
- * on a live node, if any replicas reporting their status as "active" but the node is not live is
- * marked as "down"; used by CLUSTERSTATUS.
- * @param liveNodes List of currently live node names.
- * @param collectionProps Map of collection status information pulled directly from ZooKeeper.
- */
-
- @SuppressWarnings("unchecked")
- protected void crossCheckReplicaStateWithLiveNodes(List<String> liveNodes, NamedList<Object> collectionProps) {
- Iterator<Map.Entry<String,Object>> colls = collectionProps.iterator();
- while (colls.hasNext()) {
- Map.Entry<String,Object> next = colls.next();
- Map<String,Object> collMap = (Map<String,Object>)next.getValue();
- Map<String,Object> shards = (Map<String,Object>)collMap.get("shards");
- for (Object nextShard : shards.values()) {
- Map<String,Object> shardMap = (Map<String,Object>)nextShard;
- Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
- for (Object nextReplica : replicas.values()) {
- Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
- if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
- // not down, so verify the node is live
- String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
- if (!liveNodes.contains(node_name)) {
- // node is not live, so this replica is actually down
- replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
- }
- }
- }
- }
- }
- }
-
- /**
- * Get collection status from cluster state.
- * Can return collection status by given shard name.
- *
- *
- * @param collection collection map parsed from JSON-serialized {@link ClusterState}
- * @param name collection name
- * @param requestedShards a set of shards to be returned in the status.
- * An empty or null values indicates <b>all</b> shards.
- * @return map of collection properties
- */
- @SuppressWarnings("unchecked")
- private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, Set<String> requestedShards) {
- if (collection == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
- }
- if (requestedShards == null || requestedShards.isEmpty()) {
- return collection;
- } else {
- Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
- Map<String, Object> selected = new HashMap<>();
- for (String selectedShard : requestedShards) {
- if (!shards.containsKey(selectedShard)) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
- }
- selected.put(selectedShard, shards.get(selectedShard));
- collection.put("shards", selected);
- }
- return collection;
- }
- }
-
- @SuppressWarnings("unchecked")
- void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
- throws Exception {
- ((DeleteReplicaCmd) commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, message, results, onComplete);
-
- }
-
- boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
- TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
- while (! timeout.hasTimedOut()) {
- timeout.sleep(100);
- DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
- if (docCollection == null) { // someone already deleted the collection
- return true;
- }
- Slice slice = docCollection.getSlice(shard);
- if(slice == null || slice.getReplica(replicaName) == null) {
- return true;
- }
- }
- // replica still exists after the timeout
- return false;
- }
-
- void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
- ZkNodeProps m = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
- ZkStateReader.CORE_NAME_PROP, core,
- ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
- ZkStateReader.COLLECTION_PROP, collectionName,
- ZkStateReader.CORE_NODE_NAME_PROP, replicaName,
- ZkStateReader.BASE_URL_PROP, replica.getStr(ZkStateReader.BASE_URL_PROP));
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
- }
-
- void checkRequired(ZkNodeProps message, String... props) {
- for (String prop : props) {
- if(message.get(prop) == null){
- throw new SolrException(ErrorCode.BAD_REQUEST, StrUtils.join(Arrays.asList(props),',') +" are required params" );
- }
- }
-
- }
-
- //TODO should we not remove in the next release ?
- private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- final String collectionName = message.getStr(COLLECTION_PROP);
-
- boolean firstLoop = true;
- // wait for a while until the state format changes
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- while (! timeout.hasTimedOut()) {
- DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
- if (collection == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
- }
- if (collection.getStateFormat() == 2) {
- // Done.
- results.add("success", new SimpleOrderedMap<>());
- return;
- }
-
- if (firstLoop) {
- // Actually queue the migration command.
- firstLoop = false;
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
- }
- timeout.sleep(100);
- }
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
- }
-
- void commit(NamedList results, String slice, Replica parentShardLeader) {
- log.debug("Calling soft commit to make sub shard updates visible");
- String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
- // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
- // and we force open a searcher so that we have documents to show upon switching states
- UpdateResponse updateResponse = null;
- try {
- updateResponse = softCommit(coreUrl);
- processResponse(results, null, coreUrl, updateResponse, slice, Collections.emptySet());
- } catch (Exception e) {
- processResponse(results, e, coreUrl, updateResponse, slice, Collections.emptySet());
- throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
- }
- }
-
-
- static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
-
- try (HttpSolrClient client = new HttpSolrClient.Builder(url)
- .withConnectionTimeout(30000)
- .withSocketTimeout(120000)
- .build()) {
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(new ModifiableSolrParams());
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true, true);
- return ureq.process(client);
- }
- }
-
- String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
- int retryCount = 320;
- while (retryCount-- > 0) {
- final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
- if (docCollection != null && docCollection.getSlicesMap() != null) {
- Map<String,Slice> slicesMap = docCollection.getSlicesMap();
- for (Slice slice : slicesMap.values()) {
- for (Replica replica : slice.getReplicas()) {
- // TODO: for really large clusters, we could 'index' on this
-
- String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
- String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
- if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
- return replica.getName();
- }
- }
- }
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
- }
-
- void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
- log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
- RTimer timer = new RTimer();
- int retryCount = 320;
- while (retryCount-- > 0) {
- DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
- if (collection == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Unable to find collection: " + collectionName + " in clusterstate");
- }
- Slice slice = collection.getSlice(sliceName);
- if (slice != null) {
- log.debug("Waited for {}ms for slice {} of collection {} to be available",
- timer.getTime(), sliceName, collectionName);
- return;
- }
- Thread.sleep(1000);
- }
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not find new slice " + sliceName + " in collection " + collectionName
- + " even after waiting for " + timer.getTime() + "ms"
- );
- }
-
- DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
- if (a == null || b == null || !a.overlaps(b)) {
- return null;
- } else if (a.isSubsetOf(b))
- return a;
- else if (b.isSubsetOf(a))
- return b;
- else if (b.includes(a.max)) {
- return new DocRouter.Range(b.min, a.max);
- } else {
- return new DocRouter.Range(a.min, b.max);
- }
- }
-
- void sendShardRequest(String nodeName, ModifiableSolrParams params,
- ShardHandler shardHandler, String asyncId,
- Map<String, String> requestMap) {
- sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
-
- }
-
- public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
- String asyncId, Map<String, String> requestMap, String adminPath,
- ZkStateReader zkStateReader) {
- if (asyncId != null) {
- String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
- params.set(ASYNC, coreAdminAsyncId);
- requestMap.put(nodeName, coreAdminAsyncId);
- }
-
- ShardRequest sreq = new ShardRequest();
- params.set("qt", adminPath);
- sreq.purpose = 1;
- String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
- sreq.shards = new String[]{replica};
- sreq.actualShards = sreq.shards;
- sreq.nodeName = nodeName;
- sreq.params = params;
-
- shardHandler.submit(sreq, replica, sreq.params);
- }
-
- void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
- // Now add the property.key=value pairs
- for (String key : message.keySet()) {
- if (key.startsWith(COLL_PROP_PREFIX)) {
- params.set(key, message.getStr(key));
- }
- }
- }
-
- void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
- // Now add the property.key=value pairs
- for (String key : message.keySet()) {
- if (key.startsWith(COLL_PROP_PREFIX)) {
- map.put(key, message.getStr(key));
- }
- }
- }
-
-
- private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
- throws Exception {
-
- final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
- //the rest of the processing is based on writing cluster state properties
- //remove the property here to avoid any errors down the pipeline due to this property appearing
- String configName = (String) message.getProperties().remove(COLL_CONF);
-
- if(configName != null) {
- validateConfigOrThrowSolrException(configName);
-
- boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
- createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
- reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
- }
-
- overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
-
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- boolean areChangesVisible = true;
- while (!timeout.hasTimedOut()) {
- DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
- areChangesVisible = true;
- for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
- String updateKey = updateEntry.getKey();
- if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
- && !updateKey.equals(Overseer.QUEUE_OPERATION)
- && !collection.get(updateKey).equals(updateEntry.getValue())){
- areChangesVisible = false;
- break;
- }
- }
- if (areChangesVisible) break;
- timeout.sleep(100);
- }
-
- if (!areChangesVisible)
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
- }
-
- void cleanupCollection(String collectionName, NamedList results) throws Exception {
- log.error("Cleaning up collection [" + collectionName + "]." );
- Map<String, Object> props = makeMap(
- Overseer.QUEUE_OPERATION, DELETE.toLower(),
- NAME, collectionName);
- commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
- }
-
- Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
- Map<String, Replica> result = new HashMap<>();
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- while (true) {
- DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
- for (String coreName : coreNames) {
- if (result.containsKey(coreName)) continue;
- for (Slice slice : coll.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
- result.put(coreName, replica);
- break;
- }
- }
- }
- }
-
- if (result.size() == coreNames.size()) {
- return result;
- } else {
- log.debug("Expecting {} cores but found {}", coreNames, result);
- }
- if (timeout.hasTimedOut()) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
- }
-
- Thread.sleep(100);
- }
- }
-
- ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
- throws Exception {
-
- return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
- }
-
- void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
- String asyncId, Map<String, String> requestMap) {
- processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
- }
-
- void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
- String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
- //Processes all shard responses
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp, okayExceptions);
- Throwable exception = srsp.getException();
- if (abortOnError && exception != null) {
- // drain pending requests
- while (srsp != null) {
- srsp = shardHandler.takeCompletedOrError();
- }
- throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
- }
- }
- } while (srsp != null);
-
- //If request is async wait for the core admin to complete before returning
- if (asyncId != null) {
- waitForAsyncCallsToComplete(requestMap, results);
- requestMap.clear();
- }
- }
-
-
- void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
- boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
- if(!isValid) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
- }
- }
-
- /**
- * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
- * That check should be done before the config node is created.
- */
- public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
-
- if (configName != null) {
- String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
- log.debug("creating collections conf node {} ", collDir);
- byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
- if (stateManager.hasData(collDir)) {
- stateManager.setData(collDir, data, -1);
- } else {
- stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
- }
- } else {
- if(isLegacyCloud){
- log.warn("Could not obtain config name");
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
- }
- }
- }
-
- private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
- NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
- collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
- }
-
-
- void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
- NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
- log.info("Executing Collection Cmd : " + params);
- String collectionName = message.getStr(NAME);
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
- ClusterState clusterState = zkStateReader.getClusterState();
- DocCollection coll = clusterState.getCollection(collectionName);
-
- for (Slice slice : coll.getSlices()) {
- sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
- }
-
- processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
-
- }
-
- void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
- Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
-
- for (Replica replica : slice.getReplicas()) {
- if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
- && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
-
- // For thread safety, only simple clone the ModifiableSolrParams
- ModifiableSolrParams cloneParams = new ModifiableSolrParams();
- cloneParams.add(params);
- cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
- sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
- }
- }
- }
-
- private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
- Throwable e = srsp.getException();
- String nodeName = srsp.getNodeName();
- SolrResponse solrResponse = srsp.getSolrResponse();
- String shard = srsp.getShard();
-
- processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
- }
-
- @SuppressWarnings("unchecked")
- private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
- String rootThrowable = null;
- if (e instanceof RemoteSolrException) {
- rootThrowable = ((RemoteSolrException) e).getRootThrowable();
- }
-
- if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
- log.error("Error from shard: " + shard, e);
-
- SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
- if (failure == null) {
- failure = new SimpleOrderedMap();
- results.add("failure", failure);
- }
-
- failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
-
- } else {
-
- SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
- if (success == null) {
- success = new SimpleOrderedMap();
- results.add("success", success);
- }
-
- success.add(nodeName, solrResponse.getResponse());
- }
- }
-
- @SuppressWarnings("unchecked")
- private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
- for (String k:requestMap.keySet()) {
- log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
- results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
- }
- }
-
- private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
- params.set(CoreAdminParams.REQUESTID, requestId);
- int counter = 0;
- ShardRequest sreq;
- do {
- sreq = new ShardRequest();
- params.set("qt", adminPath);
- sreq.purpose = 1;
- String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
- sreq.shards = new String[] {replica};
- sreq.actualShards = sreq.shards;
- sreq.params = params;
-
- shardHandler.submit(sreq, replica, sreq.params);
-
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- NamedList results = new NamedList();
- processResponse(results, srsp, Collections.emptySet());
- if (srsp.getSolrResponse().getResponse() == null) {
- NamedList response = new NamedList();
- response.add("STATUS", "failed");
- return response;
- }
-
- String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
- if (r.equals("running")) {
- log.debug("The task is still RUNNING, continuing to wait.");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- continue;
-
- } else if (r.equals("completed")) {
- log.debug("The task is COMPLETED, returning");
- return srsp.getSolrResponse().getResponse();
- } else if (r.equals("failed")) {
- // TODO: Improve this. Get more information.
- log.debug("The task is FAILED, returning");
- return srsp.getSolrResponse().getResponse();
- } else if (r.equals("notfound")) {
- log.debug("The task is notfound, retry");
- if (counter++ < 5) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- break;
- }
- throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request for requestId: " + requestId + "" + srsp.getSolrResponse().getResponse().get("STATUS") +
- "retried " + counter + "times");
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
- }
- }
- } while (srsp != null);
- } while(true);
- }
-
- @Override
- public String getName() {
- return "Overseer Collection Message Handler";
- }
-
- @Override
- public String getTimerName(String operation) {
- return "collection_" + operation;
- }
-
- @Override
- public String getTaskKey(ZkNodeProps message) {
- return message.containsKey(COLLECTION_PROP) ?
- message.getStr(COLLECTION_PROP) : message.getStr(NAME);
- }
-
-
- private long sessionId = -1;
- private LockTree.Session lockSession;
-
- @Override
- public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
- if (lockSession == null || sessionId != taskBatch.getId()) {
- //this is always called in the same thread.
- //Each batch is supposed to have a new taskBatch
- //So if taskBatch changes we must create a new Session
- // also check if the running tasks are empty. If yes, clear lockTree
- // this will ensure that locks are not 'leaked'
- if(taskBatch.getRunningTasks() == 0) lockTree.clear();
- lockSession = lockTree.getSession();
- }
- return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
- Arrays.asList(
- getTaskKey(message),
- message.getStr(ZkStateReader.SHARD_ID_PROP),
- message.getStr(ZkStateReader.REPLICA_PROP))
-
- );
- }
-
-
- @Override
- public void close() throws IOException {
- this.isClosed = true;
- if (tpe != null) {
- if (!tpe.isShutdown()) {
- ExecutorUtil.shutdownAndAwaitTermination(tpe);
- }
- }
- }
-
- @Override
- public boolean isClosed() {
- return isClosed;
- }
-
- interface Cmd {
- void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java b/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
deleted file mode 100644
index 0f450bd..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-
-public class OverseerRoleCmd implements Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final OverseerCollectionMessageHandler ocmh;
- private final CollectionAction operation;
- private final OverseerNodePrioritizer overseerPrioritizer;
-
-
-
- public OverseerRoleCmd(OverseerCollectionMessageHandler ocmh, CollectionAction operation, OverseerNodePrioritizer prioritizer) {
- this.ocmh = ocmh;
- this.operation = operation;
- this.overseerPrioritizer = prioritizer;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- ZkStateReader zkStateReader = ocmh.zkStateReader;
- SolrZkClient zkClient = zkStateReader.getZkClient();
- Map roles = null;
- String node = message.getStr("node");
-
- String roleName = message.getStr("role");
- boolean nodeExists = false;
- if (nodeExists = zkClient.exists(ZkStateReader.ROLES, true)) {
- roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
- } else {
- roles = new LinkedHashMap(1);
- }
-
- List nodeList = (List) roles.get(roleName);
- if (nodeList == null) roles.put(roleName, nodeList = new ArrayList());
- if (ADDROLE == operation) {
- log.info("Overseer role added to {}", node);
- if (!nodeList.contains(node)) nodeList.add(node);
- } else if (REMOVEROLE == operation) {
- log.info("Overseer role removed from {}", node);
- nodeList.remove(node);
- }
-
- if (nodeExists) {
- zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles), true);
- } else {
- zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT, true);
- }
- //if there are too many nodes this command may time out. And most likely dedicated
- // overseers are created when there are too many nodes . So , do this operation in a separate thread
- new Thread(() -> {
- try {
- overseerPrioritizer.prioritizeOverseerNodes(ocmh.myId);
- } catch (Exception e) {
- log.error("Error in prioritizing Overseer", e);
- }
-
- }).start();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
deleted file mode 100644
index aba4872..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
+++ /dev/null
@@ -1,112 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.codahale.metrics.Timer;
-import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.util.stats.MetricUtils;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OverseerStatusCmd implements Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final OverseerCollectionMessageHandler ocmh;
-
- public OverseerStatusCmd(OverseerCollectionMessageHandler ocmh) {
- this.ocmh = ocmh;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- ZkStateReader zkStateReader = ocmh.zkStateReader;
- String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
- results.add("leader", leaderNode);
- Stat stat = new Stat();
- zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
- results.add("overseer_queue_size", stat.getNumChildren());
- stat = new Stat();
- zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
- results.add("overseer_work_queue_size", stat.getNumChildren());
- stat = new Stat();
- zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
- results.add("overseer_collection_queue_size", stat.getNumChildren());
-
- NamedList overseerStats = new NamedList();
- NamedList collectionStats = new NamedList();
- NamedList stateUpdateQueueStats = new NamedList();
- NamedList workQueueStats = new NamedList();
- NamedList collectionQueueStats = new NamedList();
- Stats stats = ocmh.stats;
- for (Map.Entry<String, Stats.Stat> entry : stats.getStats().entrySet()) {
- String key = entry.getKey();
- NamedList<Object> lst = new SimpleOrderedMap<>();
- if (key.startsWith("collection_")) {
- collectionStats.add(key.substring(11), lst);
- int successes = stats.getSuccessCount(entry.getKey());
- int errors = stats.getErrorCount(entry.getKey());
- lst.add("requests", successes);
- lst.add("errors", errors);
- List<Stats.FailedOp> failureDetails = stats.getFailureDetails(key);
- if (failureDetails != null) {
- List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
- for (Stats.FailedOp failedOp : failureDetails) {
- SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
- fail.add("request", failedOp.req.getProperties());
- fail.add("response", failedOp.resp.getResponse());
- failures.add(fail);
- }
- lst.add("recent_failures", failures);
- }
- } else if (key.startsWith("/overseer/queue_")) {
- stateUpdateQueueStats.add(key.substring(16), lst);
- } else if (key.startsWith("/overseer/queue-work_")) {
- workQueueStats.add(key.substring(21), lst);
- } else if (key.startsWith("/overseer/collection-queue-work_")) {
- collectionQueueStats.add(key.substring(32), lst);
- } else {
- // overseer stats
- overseerStats.add(key, lst);
- int successes = stats.getSuccessCount(entry.getKey());
- int errors = stats.getErrorCount(entry.getKey());
- lst.add("requests", successes);
- lst.add("errors", errors);
- }
- Timer timer = entry.getValue().requestTime;
- MetricUtils.addMetrics(lst, timer);
- }
- results.add("overseer_operations", overseerStats);
- results.add("collection_operations", collectionStats);
- results.add("overseer_queue", stateUpdateQueueStats);
- results.add("overseer_internal_queue", workQueueStats);
- results.add("collection_queue", collectionQueueStats);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
deleted file mode 100644
index e903091..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.solr.common.SolrCloseableLatch;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CommonAdminParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final OverseerCollectionMessageHandler ocmh;
-
- public ReplaceNodeCmd(OverseerCollectionMessageHandler ocmh) {
- this.ocmh = ocmh;
- }
-
- @Override
- public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- ZkStateReader zkStateReader = ocmh.zkStateReader;
- String source = message.getStr(CollectionParams.SOURCE_NODE, message.getStr("source"));
- String target = message.getStr(CollectionParams.TARGET_NODE, message.getStr("target"));
- boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
- if (source == null || target == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode and targetNode are required params" );
- }
- String async = message.getStr("async");
- int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
- boolean parallel = message.getBool("parallel", false);
- ClusterState clusterState = zkStateReader.getClusterState();
-
- if (!clusterState.liveNodesContain(source)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
- }
- if (!clusterState.liveNodesContain(target)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
- }
- List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
- // how many leaders are we moving? for these replicas we have to make sure that either:
- // * another existing replica can become a leader, or
- // * we wait until the newly created replica completes recovery (and can become the new leader)
- // If waitForFinalState=true we wait for all replicas
- int numLeaders = 0;
- for (ZkNodeProps props : sourceReplicas) {
- if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
- numLeaders++;
- }
- }
- // map of collectionName_coreNodeName to watchers
- Map<String, CollectionStateWatcher> watchers = new HashMap<>();
- List<ZkNodeProps> createdReplicas = new ArrayList<>();
-
- AtomicBoolean anyOneFailed = new AtomicBoolean(false);
- SolrCloseableLatch countDownLatch = new SolrCloseableLatch(sourceReplicas.size(), ocmh);
-
- SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ocmh);
-
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- NamedList nl = new NamedList();
- log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
- ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
- if(async!=null) msg.getProperties().put(ASYNC, async);
- final ZkNodeProps addedReplica = ocmh.addReplica(clusterState,
- msg, nl, () -> {
- countDownLatch.countDown();
- if (nl.get("failure") != null) {
- String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
- " on node=%s", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
- log.warn(errorString);
- // one replica creation failed. Make the best attempt to
- // delete all the replicas created so far in the target
- // and exit
- synchronized (results) {
- results.add("failure", errorString);
- anyOneFailed.set(true);
- }
- } else {
- log.debug("Successfully created replica for collection={} shard={} on node={}",
- sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
- }
- });
-
- if (addedReplica != null) {
- createdReplicas.add(addedReplica);
- if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
- String shardName = sourceReplica.getStr(SHARD_ID_PROP);
- String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
- String collectionName = sourceReplica.getStr(COLLECTION_PROP);
- String key = collectionName + "_" + replicaName;
- CollectionStateWatcher watcher;
- if (waitForFinalState) {
- watcher = new ActiveReplicaWatcher(collectionName, null,
- Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
- } else {
- watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
- addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
- }
- watchers.put(key, watcher);
- log.debug("--- adding " + key + ", " + watcher);
- zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
- } else {
- log.debug("--- not waiting for " + addedReplica);
- }
- }
- }
-
- log.debug("Waiting for replicas to be added");
- if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
- log.info("Timed out waiting for replicas to be added");
- anyOneFailed.set(true);
- } else {
- log.debug("Finished waiting for replicas to be added");
- }
-
- // now wait for leader replicas to recover
- log.debug("Waiting for " + numLeaders + " leader replicas to recover");
- if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
- log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
- anyOneFailed.set(true);
- } else {
- log.debug("Finished waiting for leader replicas to recover");
- }
- // remove the watchers, we're done either way
- for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
- zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
- }
- if (anyOneFailed.get()) {
- log.info("Failed to create some replicas. Cleaning up all replicas on target node");
- SolrCloseableLatch cleanupLatch = new SolrCloseableLatch(createdReplicas.size(), ocmh);
- for (ZkNodeProps createdReplica : createdReplicas) {
- NamedList deleteResult = new NamedList();
- try {
- ocmh.deleteReplica(zkStateReader.getClusterState(), createdReplica.plus("parallel", "true"), deleteResult, () -> {
- cleanupLatch.countDown();
- if (deleteResult.get("failure") != null) {
- synchronized (results) {
- results.add("failure", "Could not cleanup, because of : " + deleteResult.get("failure"));
- }
- }
- });
- } catch (KeeperException e) {
- cleanupLatch.countDown();
- log.warn("Error deleting replica ", e);
- } catch (Exception e) {
- log.warn("Error deleting replica ", e);
- cleanupLatch.countDown();
- throw e;
- }
- }
- cleanupLatch.await(5, TimeUnit.MINUTES);
- return;
- }
-
-
- // we have reached this far means all replicas could be recreated
- //now cleanup the replicas in the source node
- DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh, source, async);
- results.add("success", "REPLACENODE action completed successfully from : " + source + " to : " + target);
- }
-
- static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
- List<ZkNodeProps> sourceReplicas = new ArrayList<>();
- for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
- for (Slice slice : e.getValue().getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (source.equals(replica.getNodeName())) {
- ZkNodeProps props = new ZkNodeProps(
- COLLECTION_PROP, e.getKey(),
- SHARD_ID_PROP, slice.getName(),
- ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
- ZkStateReader.REPLICA_PROP, replica.getName(),
- ZkStateReader.REPLICA_TYPE, replica.getType().name(),
- ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
- CoreAdminParams.NODE, source);
- sourceReplicas.add(props);
- }
- }
- }
- }
- return sourceReplicas;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
deleted file mode 100644
index 9c9a5c9..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.backup.BackupManager;
-import org.apache.solr.core.backup.repository.BackupRepository;
-import org.apache.solr.handler.component.ShardHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROPS;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
-public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final OverseerCollectionMessageHandler ocmh;
-
- public RestoreCmd(OverseerCollectionMessageHandler ocmh) {
- this.ocmh = ocmh;
- }
-
- @Override
- public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- // TODO maybe we can inherit createCollection's options/code
-
- String restoreCollectionName = message.getStr(COLLECTION_PROP);
- String backupName = message.getStr(NAME); // of backup
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
- String asyncId = message.getStr(ASYNC);
- String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
- Map<String, String> requestMap = new HashMap<>();
-
- CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
- BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
-
- URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
- URI backupPath = repository.resolve(location, backupName);
- ZkStateReader zkStateReader = ocmh.zkStateReader;
- BackupManager backupMgr = new BackupManager(repository, zkStateReader);
-
- Properties properties = backupMgr.readBackupProperties(location, backupName);
- String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
- DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
-
- // Get the Solr nodes to restore a collection.
- final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
- zkStateReader.getClusterState().getLiveNodes(), message, RANDOM);
-
- int numShards = backupCollectionState.getActiveSlices().size();
-
- int numNrtReplicas = getInt(message, NRT_REPLICAS, backupCollectionState.getNumNrtReplicas(), 0);
- if (numNrtReplicas == 0) {
- numNrtReplicas = getInt(message, REPLICATION_FACTOR, backupCollectionState.getReplicationFactor(), 0);
- }
- int numTlogReplicas = getInt(message, TLOG_REPLICAS, backupCollectionState.getNumTlogReplicas(), 0);
- int numPullReplicas = getInt(message, PULL_REPLICAS, backupCollectionState.getNumPullReplicas(), 0);
- int totalReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
-
- int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, backupCollectionState.getMaxShardsPerNode());
- int availableNodeCount = nodeList.size();
- if ((numShards * totalReplicasPerShard) > (availableNodeCount * maxShardsPerNode)) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "Solr cloud with available number of nodes:%d is insufficient for"
- + " restoring a collection with %d shards, total replicas per shard %d and maxShardsPerNode %d."
- + " Consider increasing maxShardsPerNode value OR number of available nodes.",
- availableNodeCount, numShards, totalReplicasPerShard, maxShardsPerNode));
- }
-
- //Upload the configs
- String configName = (String) properties.get(COLL_CONF);
- String restoreConfigName = message.getStr(COLL_CONF, configName);
- if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
- log.info("Using existing config {}", restoreConfigName);
- //TODO add overwrite option?
- } else {
- log.info("Uploading config {}", restoreConfigName);
- backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
- }
-
- log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
- location);
-
- //Create core-less collection
- {
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
- propMap.put("fromApi", "true"); // mostly true. Prevents autoCreated=true in the collection state.
- if (properties.get(STATE_FORMAT) == null) {
- propMap.put(STATE_FORMAT, "2");
- }
-
- // inherit settings from input API, defaulting to the backup's setting. Ex: replicationFactor
- for (String collProp : COLL_PROPS.keySet()) {
- Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
- if (val != null) {
- propMap.put(collProp, val);
- }
- }
-
- propMap.put(NAME, restoreCollectionName);
- propMap.put(CREATE_NODE_SET, CREATE_NODE_SET_EMPTY); //no cores
- propMap.put(COLL_CONF, restoreConfigName);
-
- // router.*
- @SuppressWarnings("unchecked")
- Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
- for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
- propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
- }
-
- Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
- if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
- propMap.put(SHARDS_PROP, StrUtils.join(sliceNames, ','));
- } else {
- propMap.put(NUM_SLICES, sliceNames.size());
- // ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
- // list of names, and if so uses this instead of building it. We clear the replica list.
- Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
- Map<String, Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
- for (Slice backupSlice : backupSlices) {
- newSlices.put(backupSlice.getName(),
- new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
- }
- propMap.put(SHARDS_PROP, newSlices);
- }
-
- ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
- // note: when createCollection() returns, the collection exists (no race)
- }
-
- DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-
- //Mark all shards in CONSTRUCTION STATE while we restore the data
- {
- //TODO might instead createCollection accept an initial state? Is there a race?
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- for (Slice shard : restoreCollection.getSlices()) {
- propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
- }
-
- // TODO how do we leverage the RULE / SNITCH logic in createCollection?
-
- ClusterState clusterState = zkStateReader.getClusterState();
-
- List<String> sliceNames = new ArrayList<>();
- restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
- PolicyHelper.SessionWrapper sessionWrapper = null;
-
- try {
- List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
- ocmh.cloudManager, clusterState,
- nodeList, restoreCollectionName,
- message, sliceNames,
- numNrtReplicas, numTlogReplicas, numPullReplicas);
- sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
- //Create one replica per shard and copy backed up data to it
- for (Slice slice : restoreCollection.getSlices()) {
- log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
- propMap.put(COLLECTION_PROP, restoreCollectionName);
- propMap.put(SHARD_ID_PROP, slice.getName());
-
- if (numNrtReplicas >= 1) {
- propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
- } else if (numTlogReplicas >= 1) {
- propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
- Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
- }
-
- // Get the first node matching the shard to restore in
- String node;
- for (ReplicaPosition replicaPosition : replicaPositions) {
- if (Objects.equals(replicaPosition.shard, slice.getName())) {
- node = replicaPosition.node;
- propMap.put(CoreAdminParams.NODE, node);
- replicaPositions.remove(replicaPosition);
- break;
- }
- }
-
- // add async param
- if (asyncId != null) {
- propMap.put(ASYNC, asyncId);
- }
- ocmh.addPropertyParams(message, propMap);
-
- ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
- }
-
- //refresh the location copy of collection state
- restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
- //Copy data from backed up index to each replica
- for (Slice slice : restoreCollection.getSlices()) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
- params.set(NAME, "snapshot." + slice.getName());
- params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
- params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
-
- ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
- }
- ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
-
- //Mark all shards in ACTIVE STATE
- {
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
- for (Slice shard : restoreCollection.getSlices()) {
- propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
- }
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
- }
-
- //refresh the location copy of collection state
- restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
- if (totalReplicasPerShard > 1) {
- log.info("Adding replicas to restored collection={}", restoreCollection);
- for (Slice slice : restoreCollection.getSlices()) {
-
- //Add the remaining replicas for each shard, considering it's type
- int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
-
- // We already created either a NRT or an TLOG replica as leader
- if (numNrtReplicas > 0) {
- createdNrtReplicas++;
- } else if (createdTlogReplicas > 0) {
- createdTlogReplicas++;
- }
-
- for (int i = 1; i < totalReplicasPerShard; i++) {
- Replica.Type typeToCreate;
- if (createdNrtReplicas < numNrtReplicas) {
- createdNrtReplicas++;
- typeToCreate = Replica.Type.NRT;
- } else if (createdTlogReplicas < numTlogReplicas) {
- createdTlogReplicas++;
- typeToCreate = Replica.Type.TLOG;
- } else {
- createdPullReplicas++;
- typeToCreate = Replica.Type.PULL;
- assert createdPullReplicas <= numPullReplicas: "Unexpected number of replicas";
- }
-
- log.debug("Adding replica for shard={} collection={} of type {} ", slice.getName(), restoreCollection, typeToCreate);
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(COLLECTION_PROP, restoreCollectionName);
- propMap.put(SHARD_ID_PROP, slice.getName());
- propMap.put(REPLICA_TYPE, typeToCreate.name());
-
- // Get the first node matching the shard to restore in
- String node;
- for (ReplicaPosition replicaPosition : replicaPositions) {
- if (Objects.equals(replicaPosition.shard, slice.getName())) {
- node = replicaPosition.node;
- propMap.put(CoreAdminParams.NODE, node);
- replicaPositions.remove(replicaPosition);
- break;
- }
- }
-
- // add async param
- if (asyncId != null) {
- propMap.put(ASYNC, asyncId);
- }
- ocmh.addPropertyParams(message, propMap);
-
- ocmh.addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
- }
- }
- }
-
- log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
- } finally {
- if (sessionWrapper != null) sessionWrapper.release();
- }
- }
-
- private int getInt(ZkNodeProps message, String propertyName, Integer count, int defaultValue) {
- Integer value = message.getInt(propertyName, count);
- return value!=null ? value:defaultValue;
- }
-}