You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by va...@apache.org on 2018/01/16 20:39:49 UTC
[08/15] lucene-solr:branch_7x: SOLR-11817: Move Collections API
classes to it's own package
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
new file mode 100644
index 0000000..0b2afd9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.LockTree;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.OverseerMessageHandler;
+import org.apache.solr.cloud.OverseerNodePrioritizer;
+import org.apache.solr.cloud.OverseerSolrResponse;
+import org.apache.solr.cloud.OverseerTaskProcessor;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.SuppressForbidden;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
+/**
+ * A {@link OverseerMessageHandler} that handles Collections API related
+ * overseer messages.
+ */
+public class OverseerCollectionMessageHandler implements OverseerMessageHandler, SolrCloseable {
+
+ public static final String NUM_SLICES = "numShards";
+
+ public static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
+ public static final String CREATE_NODE_SET_SHUFFLE = CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
+ public static final String CREATE_NODE_SET_EMPTY = "EMPTY";
+ public static final String CREATE_NODE_SET = CollectionAdminParams.CREATE_NODE_SET_PARAM;
+
+ public static final String ROUTER = "router";
+
+ public static final String SHARDS_PROP = "shards";
+
+ public static final String REQUESTID = "requestid";
+
+ public static final String COLL_CONF = "collection.configName";
+
+ public static final String COLL_PROP_PREFIX = "property.";
+
+ public static final String ONLY_IF_DOWN = "onlyIfDown";
+
+ public static final String SHARD_UNIQUE = "shardUnique";
+
+ public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
+
+ static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
+
+ public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
+ ROUTER, DocRouter.DEFAULT_NAME,
+ ZkStateReader.REPLICATION_FACTOR, "1",
+ ZkStateReader.NRT_REPLICAS, "1",
+ ZkStateReader.TLOG_REPLICAS, "0",
+ ZkStateReader.PULL_REPLICAS, "0",
+ ZkStateReader.MAX_SHARDS_PER_NODE, "1",
+ ZkStateReader.AUTO_ADD_REPLICAS, "false",
+ DocCollection.RULE, null,
+ POLICY, null,
+ SNITCH, null));
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Overseer overseer;
+ ShardHandlerFactory shardHandlerFactory;
+ String adminPath;
+ ZkStateReader zkStateReader;
+ SolrCloudManager cloudManager;
+ String myId;
+ Stats stats;
+ TimeSource timeSource;
+
+ // Set that tracks collections that are currently being processed by a running task.
+ // This is used for handling mutual exclusion of the tasks.
+
+ final private LockTree lockTree = new LockTree();
+ ExecutorService tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
+ new SynchronousQueue<>(),
+ new DefaultSolrThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
+
+ protected static final Random RANDOM;
+ static {
+ // We try to make things reproducible in the context of our tests by initializing the random instance
+ // based on the current seed
+ String seed = System.getProperty("tests.seed");
+ if (seed == null) {
+ RANDOM = new Random();
+ } else {
+ RANDOM = new Random(seed.hashCode());
+ }
+ }
+
+ final Map<CollectionAction, Cmd> commandMap;
+
+ private volatile boolean isClosed;
+
+ public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
+ final ShardHandlerFactory shardHandlerFactory,
+ String adminPath,
+ Stats stats,
+ Overseer overseer,
+ OverseerNodePrioritizer overseerPrioritizer) {
+ this.zkStateReader = zkStateReader;
+ this.shardHandlerFactory = shardHandlerFactory;
+ this.adminPath = adminPath;
+ this.myId = myId;
+ this.stats = stats;
+ this.overseer = overseer;
+ this.cloudManager = overseer.getSolrCloudManager();
+ this.timeSource = cloudManager.getTimeSource();
+ this.isClosed = false;
+ commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
+ .put(REPLACENODE, new ReplaceNodeCmd(this))
+ .put(DELETENODE, new DeleteNodeCmd(this))
+ .put(BACKUP, new BackupCmd(this))
+ .put(RESTORE, new RestoreCmd(this))
+ .put(CREATESNAPSHOT, new CreateSnapshotCmd(this))
+ .put(DELETESNAPSHOT, new DeleteSnapshotCmd(this))
+ .put(SPLITSHARD, new SplitShardCmd(this))
+ .put(ADDROLE, new OverseerRoleCmd(this, ADDROLE, overseerPrioritizer))
+ .put(REMOVEROLE, new OverseerRoleCmd(this, REMOVEROLE, overseerPrioritizer))
+ .put(MOCK_COLL_TASK, this::mockOperation)
+ .put(MOCK_SHARD_TASK, this::mockOperation)
+ .put(MOCK_REPLICA_TASK, this::mockOperation)
+ .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
+ .put(CREATESHARD, new CreateShardCmd(this))
+ .put(MIGRATE, new MigrateCmd(this))
+ .put(CREATE, new CreateCollectionCmd(this))
+ .put(MODIFYCOLLECTION, this::modifyCollection)
+ .put(ADDREPLICAPROP, this::processReplicaAddPropertyCommand)
+ .put(DELETEREPLICAPROP, this::processReplicaDeletePropertyCommand)
+ .put(BALANCESHARDUNIQUE, this::balanceProperty)
+ .put(REBALANCELEADERS, this::processRebalanceLeaders)
+ .put(RELOAD, this::reloadCollection)
+ .put(DELETE, new DeleteCollectionCmd(this))
+ .put(CREATEALIAS, new CreateAliasCmd(this))
+ .put(DELETEALIAS, new DeleteAliasCmd(this))
+ .put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this))
+ .put(OVERSEERSTATUS, new OverseerStatusCmd(this))
+ .put(DELETESHARD, new DeleteShardCmd(this))
+ .put(DELETEREPLICA, new DeleteReplicaCmd(this))
+ .put(ADDREPLICA, new AddReplicaCmd(this))
+ .put(MOVEREPLICA, new MoveReplicaCmd(this))
+ .put(UTILIZENODE, new UtilizeNodeCmd(this))
+ .build()
+ ;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SolrResponse processMessage(ZkNodeProps message, String operation) {
+ log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
+
+ NamedList results = new NamedList();
+ try {
+ CollectionAction action = getCollectionAction(operation);
+ Cmd command = commandMap.get(action);
+ if (command != null) {
+ command.call(cloudManager.getClusterStateProvider().getClusterState(), message, results);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ + operation);
+ }
+ } catch (Exception e) {
+ String collName = message.getStr("collection");
+ if (collName == null) collName = message.getStr(NAME);
+
+ if (collName == null) {
+ SolrException.log(log, "Operation " + operation + " failed", e);
+ } else {
+ SolrException.log(log, "Collection: " + collName + " operation: " + operation
+ + " failed", e);
+ }
+
+ results.add("Operation " + operation + " caused exception:", e);
+ SimpleOrderedMap nl = new SimpleOrderedMap();
+ nl.add("msg", e.getMessage());
+ nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
+ results.add("exception", nl);
+ }
+ return new OverseerSolrResponse(results);
+ }
+
+ @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
+ private void mockOperation(ClusterState state, ZkNodeProps message, NamedList results) throws InterruptedException {
+ //only for test purposes
+ Thread.sleep(message.getInt("sleep", 1));
+ log.info("MOCK_TASK_EXECUTED time {} data {}", System.currentTimeMillis(), Utils.toJSONString(message));
+ results.add("MOCK_FINISHED", System.currentTimeMillis());
+ }
+
+ private CollectionAction getCollectionAction(String operation) {
+ CollectionAction action = CollectionAction.get(operation);
+ if (action == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
+ }
+ return action;
+ }
+
+ private void reloadCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
+
+ String asyncId = message.getStr(ASYNC);
+ Map<String, String> requestMap = null;
+ if (asyncId != null) {
+ requestMap = new HashMap<>();
+ }
+ collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, NamedList results)
+ throws Exception {
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
+ CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
+ params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
+ params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
+ params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
+ params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
+ params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
+ params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
+
+ String baseUrl = message.getStr(BASE_URL_PROP);
+ ShardRequest sreq = new ShardRequest();
+ sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
+ // yes, they must use same admin handler path everywhere...
+ params.set("qt", adminPath);
+ sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
+ sreq.shards = new String[] {baseUrl};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ shardHandler.submit(sreq, baseUrl, sreq.params);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
+ throws Exception {
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
+ SolrZkClient zkClient = zkStateReader.getZkClient();
+ DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
+ propMap.putAll(message.getProperties());
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(Utils.toJSON(m));
+ }
+
+ private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
+ throws Exception {
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
+ SolrZkClient zkClient = zkStateReader.getZkClient();
+ DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
+ propMap.putAll(message.getProperties());
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(Utils.toJSON(m));
+ }
+
+ private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
+ "' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
+ }
+ SolrZkClient zkClient = zkStateReader.getZkClient();
+ DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
+ propMap.putAll(message.getProperties());
+ inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+ }
+
+ /**
+ * Walks the tree of collection status to verify that any replicas not reporting a "down" status is
+ * on a live node, if any replicas reporting their status as "active" but the node is not live is
+ * marked as "down"; used by CLUSTERSTATUS.
+ * @param liveNodes List of currently live node names.
+ * @param collectionProps Map of collection status information pulled directly from ZooKeeper.
+ */
+
+ @SuppressWarnings("unchecked")
+ protected void crossCheckReplicaStateWithLiveNodes(List<String> liveNodes, NamedList<Object> collectionProps) {
+ Iterator<Map.Entry<String,Object>> colls = collectionProps.iterator();
+ while (colls.hasNext()) {
+ Map.Entry<String,Object> next = colls.next();
+ Map<String,Object> collMap = (Map<String,Object>)next.getValue();
+ Map<String,Object> shards = (Map<String,Object>)collMap.get("shards");
+ for (Object nextShard : shards.values()) {
+ Map<String,Object> shardMap = (Map<String,Object>)nextShard;
+ Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
+ for (Object nextReplica : replicas.values()) {
+ Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
+ if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
+ // not down, so verify the node is live
+ String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
+ if (!liveNodes.contains(node_name)) {
+ // node is not live, so this replica is actually down
+ replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Get collection status from cluster state.
+ * Can return collection status by given shard name.
+ *
+ *
+ * @param collection collection map parsed from JSON-serialized {@link ClusterState}
+ * @param name collection name
+ * @param requestedShards a set of shards to be returned in the status.
+ * An empty or null values indicates <b>all</b> shards.
+ * @return map of collection properties
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, Set<String> requestedShards) {
+ if (collection == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
+ }
+ if (requestedShards == null || requestedShards.isEmpty()) {
+ return collection;
+ } else {
+ Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
+ Map<String, Object> selected = new HashMap<>();
+ for (String selectedShard : requestedShards) {
+ if (!shards.containsKey(selectedShard)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
+ }
+ selected.put(selectedShard, shards.get(selectedShard));
+ collection.put("shards", selected);
+ }
+ return collection;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+ throws Exception {
+ ((DeleteReplicaCmd) commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, message, results, onComplete);
+
+ }
+
+ boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
+ TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
+ while (! timeout.hasTimedOut()) {
+ timeout.sleep(100);
+ DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
+ if (docCollection == null) { // someone already deleted the collection
+ return true;
+ }
+ Slice slice = docCollection.getSlice(shard);
+ if(slice == null || slice.getReplica(replicaName) == null) {
+ return true;
+ }
+ }
+ // replica still exists after the timeout
+ return false;
+ }
+
+ void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
+ ZkNodeProps m = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+ ZkStateReader.CORE_NAME_PROP, core,
+ ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
+ ZkStateReader.COLLECTION_PROP, collectionName,
+ ZkStateReader.CORE_NODE_NAME_PROP, replicaName,
+ ZkStateReader.BASE_URL_PROP, replica.getStr(ZkStateReader.BASE_URL_PROP));
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ }
+
+ void checkRequired(ZkNodeProps message, String... props) {
+ for (String prop : props) {
+ if(message.get(prop) == null){
+ throw new SolrException(ErrorCode.BAD_REQUEST, StrUtils.join(Arrays.asList(props),',') +" are required params" );
+ }
+ }
+
+ }
+
+ //TODO should we not remove in the next release ?
+ private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ final String collectionName = message.getStr(COLLECTION_PROP);
+
+ boolean firstLoop = true;
+ // wait for a while until the state format changes
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ while (! timeout.hasTimedOut()) {
+ DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ if (collection == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
+ }
+ if (collection.getStateFormat() == 2) {
+ // Done.
+ results.add("success", new SimpleOrderedMap<>());
+ return;
+ }
+
+ if (firstLoop) {
+ // Actually queue the migration command.
+ firstLoop = false;
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ }
+ timeout.sleep(100);
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
+ }
+
+ void commit(NamedList results, String slice, Replica parentShardLeader) {
+ log.debug("Calling soft commit to make sub shard updates visible");
+ String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
+ // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
+ // and we force open a searcher so that we have documents to show upon switching states
+ UpdateResponse updateResponse = null;
+ try {
+ updateResponse = softCommit(coreUrl);
+ processResponse(results, null, coreUrl, updateResponse, slice, Collections.emptySet());
+ } catch (Exception e) {
+ processResponse(results, e, coreUrl, updateResponse, slice, Collections.emptySet());
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
+ }
+ }
+
+
+ static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
+
+ try (HttpSolrClient client = new HttpSolrClient.Builder(url)
+ .withConnectionTimeout(30000)
+ .withSocketTimeout(120000)
+ .build()) {
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(new ModifiableSolrParams());
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true, true);
+ return ureq.process(client);
+ }
+ }
+
+ String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
+ int retryCount = 320;
+ while (retryCount-- > 0) {
+ final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
+ if (docCollection != null && docCollection.getSlicesMap() != null) {
+ Map<String,Slice> slicesMap = docCollection.getSlicesMap();
+ for (Slice slice : slicesMap.values()) {
+ for (Replica replica : slice.getReplicas()) {
+ // TODO: for really large clusters, we could 'index' on this
+
+ String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
+ return replica.getName();
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+ }
+
+ void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
+ log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
+ RTimer timer = new RTimer();
+ int retryCount = 320;
+ while (retryCount-- > 0) {
+ DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ if (collection == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Unable to find collection: " + collectionName + " in clusterstate");
+ }
+ Slice slice = collection.getSlice(sliceName);
+ if (slice != null) {
+ log.debug("Waited for {}ms for slice {} of collection {} to be available",
+ timer.getTime(), sliceName, collectionName);
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not find new slice " + sliceName + " in collection " + collectionName
+ + " even after waiting for " + timer.getTime() + "ms"
+ );
+ }
+
+ DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
+ if (a == null || b == null || !a.overlaps(b)) {
+ return null;
+ } else if (a.isSubsetOf(b))
+ return a;
+ else if (b.isSubsetOf(a))
+ return b;
+ else if (b.includes(a.max)) {
+ return new DocRouter.Range(b.min, a.max);
+ } else {
+ return new DocRouter.Range(a.min, b.max);
+ }
+ }
+
+ void sendShardRequest(String nodeName, ModifiableSolrParams params,
+ ShardHandler shardHandler, String asyncId,
+ Map<String, String> requestMap) {
+ sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
+
+ }
+
+ public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
+ String asyncId, Map<String, String> requestMap, String adminPath,
+ ZkStateReader zkStateReader) {
+ if (asyncId != null) {
+ String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+ params.set(ASYNC, coreAdminAsyncId);
+ requestMap.put(nodeName, coreAdminAsyncId);
+ }
+
+ ShardRequest sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+ sreq.shards = new String[]{replica};
+ sreq.actualShards = sreq.shards;
+ sreq.nodeName = nodeName;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+ }
+
+ void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
+ // Now add the property.key=value pairs
+ for (String key : message.keySet()) {
+ if (key.startsWith(COLL_PROP_PREFIX)) {
+ params.set(key, message.getStr(key));
+ }
+ }
+ }
+
+ void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
+ // Now add the property.key=value pairs
+ for (String key : message.keySet()) {
+ if (key.startsWith(COLL_PROP_PREFIX)) {
+ map.put(key, message.getStr(key));
+ }
+ }
+ }
+
+
+ private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
+ throws Exception {
+
+ final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+ //the rest of the processing is based on writing cluster state properties
+ //remove the property here to avoid any errors down the pipeline due to this property appearing
+ String configName = (String) message.getProperties().remove(COLL_CONF);
+
+ if(configName != null) {
+ validateConfigOrThrowSolrException(configName);
+
+ boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
+ createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
+ reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
+ }
+
+ overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ boolean areChangesVisible = true;
+ while (!timeout.hasTimedOut()) {
+ DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
+ areChangesVisible = true;
+ for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
+ String updateKey = updateEntry.getKey();
+ if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
+ && !updateKey.equals(Overseer.QUEUE_OPERATION)
+ && !collection.get(updateKey).equals(updateEntry.getValue())){
+ areChangesVisible = false;
+ break;
+ }
+ }
+ if (areChangesVisible) break;
+ timeout.sleep(100);
+ }
+
+ if (!areChangesVisible)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
+ }
+
+ void cleanupCollection(String collectionName, NamedList results) throws Exception {
+ log.error("Cleaning up collection [" + collectionName + "]." );
+ Map<String, Object> props = makeMap(
+ Overseer.QUEUE_OPERATION, DELETE.toLower(),
+ NAME, collectionName);
+ commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+ }
+
+ Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
+ Map<String, Replica> result = new HashMap<>();
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ while (true) {
+ DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
+ for (String coreName : coreNames) {
+ if (result.containsKey(coreName)) continue;
+ for (Slice slice : coll.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
+ result.put(coreName, replica);
+ break;
+ }
+ }
+ }
+ }
+
+ if (result.size() == coreNames.size()) {
+ return result;
+ } else {
+ log.debug("Expecting {} cores but found {}", coreNames, result);
+ }
+ if (timeout.hasTimedOut()) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state.");
+ }
+
+ Thread.sleep(100);
+ }
+ }
+
+ ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+ throws Exception {
+
+ return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
+ }
+
+ void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+ String asyncId, Map<String, String> requestMap) {
+ processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
+ }
+
+ void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+ String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+ //Processes all shard responses
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp, okayExceptions);
+ Throwable exception = srsp.getException();
+ if (abortOnError && exception != null) {
+ // drain pending requests
+ while (srsp != null) {
+ srsp = shardHandler.takeCompletedOrError();
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
+ }
+ }
+ } while (srsp != null);
+
+ //If request is async wait for the core admin to complete before returning
+ if (asyncId != null) {
+ waitForAsyncCallsToComplete(requestMap, results);
+ requestMap.clear();
+ }
+ }
+
+
+ void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
+ boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
+ if(!isValid) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
+ }
+ }
+
+ /**
+ * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
+ * That check should be done before the config node is created.
+ */
+ public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
+
+ if (configName != null) {
+ String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
+ log.debug("creating collections conf node {} ", collDir);
+ byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
+ if (stateManager.hasData(collDir)) {
+ stateManager.setData(collDir, data, -1);
+ } else {
+ stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
+ }
+ } else {
+ if(isLegacyCloud){
+ log.warn("Could not obtain config name");
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
+ }
+ }
+ }
+
+ private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+ NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
+ collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
+ }
+
+
+ void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+ NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+ log.info("Executing Collection Cmd : " + params);
+ String collectionName = message.getStr(NAME);
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ ClusterState clusterState = zkStateReader.getClusterState();
+ DocCollection coll = clusterState.getCollection(collectionName);
+
+ for (Slice slice : coll.getSlices()) {
+ sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
+ }
+
+ processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
+
+ }
+
+ void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
+ Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
+
+ for (Replica replica : slice.getReplicas()) {
+ if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
+ && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+
+ // For thread safety, only simple clone the ModifiableSolrParams
+ ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+ cloneParams.add(params);
+ cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
+
+ sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
+ }
+ }
+ }
+
+ private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
+ Throwable e = srsp.getException();
+ String nodeName = srsp.getNodeName();
+ SolrResponse solrResponse = srsp.getSolrResponse();
+ String shard = srsp.getShard();
+
+ processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
+ String rootThrowable = null;
+ if (e instanceof RemoteSolrException) {
+ rootThrowable = ((RemoteSolrException) e).getRootThrowable();
+ }
+
+ if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
+ log.error("Error from shard: " + shard, e);
+
+ SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
+ if (failure == null) {
+ failure = new SimpleOrderedMap();
+ results.add("failure", failure);
+ }
+
+ failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
+
+ } else {
+
+ SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
+ if (success == null) {
+ success = new SimpleOrderedMap();
+ results.add("success", success);
+ }
+
+ success.add(nodeName, solrResponse.getResponse());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+ for (String k:requestMap.keySet()) {
+ log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
+ results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
+ }
+ }
+
+ private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
+ params.set(CoreAdminParams.REQUESTID, requestId);
+ int counter = 0;
+ ShardRequest sreq;
+ do {
+ sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+ sreq.shards = new String[] {replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ NamedList results = new NamedList();
+ processResponse(results, srsp, Collections.emptySet());
+ if (srsp.getSolrResponse().getResponse() == null) {
+ NamedList response = new NamedList();
+ response.add("STATUS", "failed");
+ return response;
+ }
+
+ String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
+ if (r.equals("running")) {
+ log.debug("The task is still RUNNING, continuing to wait.");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ continue;
+
+ } else if (r.equals("completed")) {
+ log.debug("The task is COMPLETED, returning");
+ return srsp.getSolrResponse().getResponse();
+ } else if (r.equals("failed")) {
+ // TODO: Improve this. Get more information.
+ log.debug("The task is FAILED, returning");
+ return srsp.getSolrResponse().getResponse();
+ } else if (r.equals("notfound")) {
+ log.debug("The task is notfound, retry");
+ if (counter++ < 5) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ break;
+ }
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request for requestId: " + requestId + "" + srsp.getSolrResponse().getResponse().get("STATUS") +
+ "retried " + counter + "times");
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
+ }
+ }
+ } while (srsp != null);
+ } while(true);
+ }
+
+ @Override
+ public String getName() {
+ return "Overseer Collection Message Handler";
+ }
+
+ @Override
+ public String getTimerName(String operation) {
+ return "collection_" + operation;
+ }
+
+ @Override
+ public String getTaskKey(ZkNodeProps message) {
+ return message.containsKey(COLLECTION_PROP) ?
+ message.getStr(COLLECTION_PROP) : message.getStr(NAME);
+ }
+
+
+ private long sessionId = -1;
+ private LockTree.Session lockSession;
+
+ @Override
+ public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
+ if (lockSession == null || sessionId != taskBatch.getId()) {
+ //this is always called in the same thread.
+ //Each batch is supposed to have a new taskBatch
+ //So if taskBatch changes we must create a new Session
+ // also check if the running tasks are empty. If yes, clear lockTree
+ // this will ensure that locks are not 'leaked'
+ if(taskBatch.getRunningTasks() == 0) lockTree.clear();
+ lockSession = lockTree.getSession();
+ }
+ return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
+ Arrays.asList(
+ getTaskKey(message),
+ message.getStr(ZkStateReader.SHARD_ID_PROP),
+ message.getStr(ZkStateReader.REPLICA_PROP))
+
+ );
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.isClosed = true;
+ if (tpe != null) {
+ if (!tpe.isShutdown()) {
+ ExecutorUtil.shutdownAndAwaitTermination(tpe);
+ }
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ protected interface Cmd {
+ void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java
new file mode 100644
index 0000000..16f9327
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.OverseerNodePrioritizer;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+
+public class OverseerRoleCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+ private final CollectionAction operation;
+ private final OverseerNodePrioritizer overseerPrioritizer;
+
+
+
+ public OverseerRoleCmd(OverseerCollectionMessageHandler ocmh, CollectionAction operation, OverseerNodePrioritizer prioritizer) {
+ this.ocmh = ocmh;
+ this.operation = operation;
+ this.overseerPrioritizer = prioritizer;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ SolrZkClient zkClient = zkStateReader.getZkClient();
+ Map roles = null;
+ String node = message.getStr("node");
+
+ String roleName = message.getStr("role");
+ boolean nodeExists = false;
+ if (nodeExists = zkClient.exists(ZkStateReader.ROLES, true)) {
+ roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
+ } else {
+ roles = new LinkedHashMap(1);
+ }
+
+ List nodeList = (List) roles.get(roleName);
+ if (nodeList == null) roles.put(roleName, nodeList = new ArrayList());
+ if (ADDROLE == operation) {
+ log.info("Overseer role added to {}", node);
+ if (!nodeList.contains(node)) nodeList.add(node);
+ } else if (REMOVEROLE == operation) {
+ log.info("Overseer role removed from {}", node);
+ nodeList.remove(node);
+ }
+
+ if (nodeExists) {
+ zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles), true);
+ } else {
+ zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT, true);
+ }
+ //if there are too many nodes this command may time out. And most likely dedicated
+ // overseers are created when there are too many nodes . So , do this operation in a separate thread
+ new Thread(() -> {
+ try {
+ overseerPrioritizer.prioritizeOverseerNodes(ocmh.myId);
+ } catch (Exception e) {
+ log.error("Error in prioritizing Overseer", e);
+ }
+
+ }).start();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
new file mode 100644
index 0000000..6f0bbfd
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
@@ -0,0 +1,113 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.codahale.metrics.Timer;
+import org.apache.solr.cloud.OverseerTaskProcessor;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.util.stats.MetricUtils;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OverseerStatusCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public OverseerStatusCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
+ results.add("leader", leaderNode);
+ Stat stat = new Stat();
+ zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
+ results.add("overseer_queue_size", stat.getNumChildren());
+ stat = new Stat();
+ zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
+ results.add("overseer_work_queue_size", stat.getNumChildren());
+ stat = new Stat();
+ zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
+ results.add("overseer_collection_queue_size", stat.getNumChildren());
+
+ NamedList overseerStats = new NamedList();
+ NamedList collectionStats = new NamedList();
+ NamedList stateUpdateQueueStats = new NamedList();
+ NamedList workQueueStats = new NamedList();
+ NamedList collectionQueueStats = new NamedList();
+ Stats stats = ocmh.stats;
+ for (Map.Entry<String, Stats.Stat> entry : stats.getStats().entrySet()) {
+ String key = entry.getKey();
+ NamedList<Object> lst = new SimpleOrderedMap<>();
+ if (key.startsWith("collection_")) {
+ collectionStats.add(key.substring(11), lst);
+ int successes = stats.getSuccessCount(entry.getKey());
+ int errors = stats.getErrorCount(entry.getKey());
+ lst.add("requests", successes);
+ lst.add("errors", errors);
+ List<Stats.FailedOp> failureDetails = stats.getFailureDetails(key);
+ if (failureDetails != null) {
+ List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
+ for (Stats.FailedOp failedOp : failureDetails) {
+ SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
+ fail.add("request", failedOp.req.getProperties());
+ fail.add("response", failedOp.resp.getResponse());
+ failures.add(fail);
+ }
+ lst.add("recent_failures", failures);
+ }
+ } else if (key.startsWith("/overseer/queue_")) {
+ stateUpdateQueueStats.add(key.substring(16), lst);
+ } else if (key.startsWith("/overseer/queue-work_")) {
+ workQueueStats.add(key.substring(21), lst);
+ } else if (key.startsWith("/overseer/collection-queue-work_")) {
+ collectionQueueStats.add(key.substring(32), lst);
+ } else {
+ // overseer stats
+ overseerStats.add(key, lst);
+ int successes = stats.getSuccessCount(entry.getKey());
+ int errors = stats.getErrorCount(entry.getKey());
+ lst.add("requests", successes);
+ lst.add("errors", errors);
+ }
+ Timer timer = entry.getValue().requestTime;
+ MetricUtils.addMetrics(lst, timer);
+ }
+ results.add("overseer_operations", overseerStats);
+ results.add("collection_operations", collectionStats);
+ results.add("overseer_queue", stateUpdateQueueStats);
+ results.add("overseer_internal_queue", workQueueStats);
+ results.add("collection_queue", collectionQueueStats);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
new file mode 100644
index 0000000..35d2379
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public ReplaceNodeCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ String source = message.getStr(CollectionParams.SOURCE_NODE, message.getStr("source"));
+ String target = message.getStr(CollectionParams.TARGET_NODE, message.getStr("target"));
+ boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+ if (source == null || target == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode and targetNode are required params" );
+ }
+ String async = message.getStr("async");
+ int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+ boolean parallel = message.getBool("parallel", false);
+ ClusterState clusterState = zkStateReader.getClusterState();
+
+ if (!clusterState.liveNodesContain(source)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
+ }
+ if (!clusterState.liveNodesContain(target)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
+ }
+ List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
+ // how many leaders are we moving? for these replicas we have to make sure that either:
+ // * another existing replica can become a leader, or
+ // * we wait until the newly created replica completes recovery (and can become the new leader)
+ // If waitForFinalState=true we wait for all replicas
+ int numLeaders = 0;
+ for (ZkNodeProps props : sourceReplicas) {
+ if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+ numLeaders++;
+ }
+ }
+ // map of collectionName_coreNodeName to watchers
+ Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+ List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+ AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+ SolrCloseableLatch countDownLatch = new SolrCloseableLatch(sourceReplicas.size(), ocmh);
+
+ SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ocmh);
+
+ for (ZkNodeProps sourceReplica : sourceReplicas) {
+ NamedList nl = new NamedList();
+ log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
+ ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
+ if(async!=null) msg.getProperties().put(ASYNC, async);
+ final ZkNodeProps addedReplica = ocmh.addReplica(clusterState,
+ msg, nl, () -> {
+ countDownLatch.countDown();
+ if (nl.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
+ log.warn(errorString);
+ // one replica creation failed. Make the best attempt to
+ // delete all the replicas created so far in the target
+ // and exit
+ synchronized (results) {
+ results.add("failure", errorString);
+ anyOneFailed.set(true);
+ }
+ } else {
+ log.debug("Successfully created replica for collection={} shard={} on node={}",
+ sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
+ }
+ });
+
+ if (addedReplica != null) {
+ createdReplicas.add(addedReplica);
+ if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+ String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+ String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+ String key = collectionName + "_" + replicaName;
+ CollectionStateWatcher watcher;
+ if (waitForFinalState) {
+ watcher = new ActiveReplicaWatcher(collectionName, null,
+ Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
+ } else {
+ watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
+ addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
+ }
+ watchers.put(key, watcher);
+ log.debug("--- adding " + key + ", " + watcher);
+ zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ } else {
+ log.debug("--- not waiting for " + addedReplica);
+ }
+ }
+ }
+
+ log.debug("Waiting for replicas to be added");
+ if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for replicas to be added");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for replicas to be added");
+ }
+
+ // now wait for leader replicas to recover
+ log.debug("Waiting for " + numLeaders + " leader replicas to recover");
+ if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for leader replicas to recover");
+ }
+ // remove the watchers, we're done either way
+ for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
+ zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
+ }
+ if (anyOneFailed.get()) {
+ log.info("Failed to create some replicas. Cleaning up all replicas on target node");
+ SolrCloseableLatch cleanupLatch = new SolrCloseableLatch(createdReplicas.size(), ocmh);
+ for (ZkNodeProps createdReplica : createdReplicas) {
+ NamedList deleteResult = new NamedList();
+ try {
+ ocmh.deleteReplica(zkStateReader.getClusterState(), createdReplica.plus("parallel", "true"), deleteResult, () -> {
+ cleanupLatch.countDown();
+ if (deleteResult.get("failure") != null) {
+ synchronized (results) {
+ results.add("failure", "Could not cleanup, because of : " + deleteResult.get("failure"));
+ }
+ }
+ });
+ } catch (KeeperException e) {
+ cleanupLatch.countDown();
+ log.warn("Error deleting replica ", e);
+ } catch (Exception e) {
+ log.warn("Error deleting replica ", e);
+ cleanupLatch.countDown();
+ throw e;
+ }
+ }
+ cleanupLatch.await(5, TimeUnit.MINUTES);
+ return;
+ }
+
+
+ // we have reached this far means all replicas could be recreated
+ //now cleanup the replicas in the source node
+ DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh, source, async);
+ results.add("success", "REPLACENODE action completed successfully from : " + source + " to : " + target);
+ }
+
+ static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
+ List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+ for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
+ for (Slice slice : e.getValue().getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ if (source.equals(replica.getNodeName())) {
+ ZkNodeProps props = new ZkNodeProps(
+ COLLECTION_PROP, e.getKey(),
+ SHARD_ID_PROP, slice.getName(),
+ ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
+ ZkStateReader.REPLICA_PROP, replica.getName(),
+ ZkStateReader.REPLICA_TYPE, replica.getType().name(),
+ ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
+ CoreAdminParams.NODE, source);
+ sourceReplicas.add(props);
+ }
+ }
+ }
+ }
+ return sourceReplicas;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
new file mode 100644
index 0000000..09ceb55
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public RestoreCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ // TODO maybe we can inherit createCollection's options/code
+
+ String restoreCollectionName = message.getStr(COLLECTION_PROP);
+ String backupName = message.getStr(NAME); // of backup
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ String asyncId = message.getStr(ASYNC);
+ String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+ Map<String, String> requestMap = new HashMap<>();
+
+ CoreContainer cc = ocmh.overseer.getCoreContainer();
+ BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+
+ URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
+ URI backupPath = repository.resolve(location, backupName);
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ BackupManager backupMgr = new BackupManager(repository, zkStateReader);
+
+ Properties properties = backupMgr.readBackupProperties(location, backupName);
+ String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
+ DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
+
+ // Get the Solr nodes to restore a collection.
+ final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
+ zkStateReader.getClusterState().getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM);
+
+ int numShards = backupCollectionState.getActiveSlices().size();
+
+ int numNrtReplicas = getInt(message, NRT_REPLICAS, backupCollectionState.getNumNrtReplicas(), 0);
+ if (numNrtReplicas == 0) {
+ numNrtReplicas = getInt(message, REPLICATION_FACTOR, backupCollectionState.getReplicationFactor(), 0);
+ }
+ int numTlogReplicas = getInt(message, TLOG_REPLICAS, backupCollectionState.getNumTlogReplicas(), 0);
+ int numPullReplicas = getInt(message, PULL_REPLICAS, backupCollectionState.getNumPullReplicas(), 0);
+ int totalReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
+
+ int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, backupCollectionState.getMaxShardsPerNode());
+ int availableNodeCount = nodeList.size();
+ if ((numShards * totalReplicasPerShard) > (availableNodeCount * maxShardsPerNode)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "Solr cloud with available number of nodes:%d is insufficient for"
+ + " restoring a collection with %d shards, total replicas per shard %d and maxShardsPerNode %d."
+ + " Consider increasing maxShardsPerNode value OR number of available nodes.",
+ availableNodeCount, numShards, totalReplicasPerShard, maxShardsPerNode));
+ }
+
+ //Upload the configs
+ String configName = (String) properties.get(OverseerCollectionMessageHandler.COLL_CONF);
+ String restoreConfigName = message.getStr(OverseerCollectionMessageHandler.COLL_CONF, configName);
+ if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
+ log.info("Using existing config {}", restoreConfigName);
+ //TODO add overwrite option?
+ } else {
+ log.info("Uploading config {}", restoreConfigName);
+ backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
+ }
+
+ log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
+ location);
+
+ //Create core-less collection
+ {
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
+ propMap.put("fromApi", "true"); // mostly true. Prevents autoCreated=true in the collection state.
+ if (properties.get(STATE_FORMAT) == null) {
+ propMap.put(STATE_FORMAT, "2");
+ }
+
+ // inherit settings from input API, defaulting to the backup's setting. Ex: replicationFactor
+ for (String collProp : OverseerCollectionMessageHandler.COLL_PROPS.keySet()) {
+ Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
+ if (val != null) {
+ propMap.put(collProp, val);
+ }
+ }
+
+ propMap.put(NAME, restoreCollectionName);
+ propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET, OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY); //no cores
+ propMap.put(OverseerCollectionMessageHandler.COLL_CONF, restoreConfigName);
+
+ // router.*
+ @SuppressWarnings("unchecked")
+ Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
+ for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
+ propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
+ }
+
+ Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
+ if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
+ propMap.put(OverseerCollectionMessageHandler.SHARDS_PROP, StrUtils.join(sliceNames, ','));
+ } else {
+ propMap.put(OverseerCollectionMessageHandler.NUM_SLICES, sliceNames.size());
+ // ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
+ // list of names, and if so uses this instead of building it. We clear the replica list.
+ Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
+ Map<String, Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
+ for (Slice backupSlice : backupSlices) {
+ newSlices.put(backupSlice.getName(),
+ new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
+ }
+ propMap.put(OverseerCollectionMessageHandler.SHARDS_PROP, newSlices);
+ }
+
+ ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
+ // note: when createCollection() returns, the collection exists (no race)
+ }
+
+ DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+ DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+
+ //Mark all shards in CONSTRUCTION STATE while we restore the data
+ {
+ //TODO might instead createCollection accept an initial state? Is there a race?
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+ for (Slice shard : restoreCollection.getSlices()) {
+ propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+ inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+ }
+
+ // TODO how do we leverage the RULE / SNITCH logic in createCollection?
+
+ ClusterState clusterState = zkStateReader.getClusterState();
+
+ List<String> sliceNames = new ArrayList<>();
+ restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
+ PolicyHelper.SessionWrapper sessionWrapper = null;
+
+ try {
+ List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
+ ocmh.cloudManager, clusterState,
+ nodeList, restoreCollectionName,
+ message, sliceNames,
+ numNrtReplicas, numTlogReplicas, numPullReplicas);
+ sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+ //Create one replica per shard and copy backed up data to it
+ for (Slice slice : restoreCollection.getSlices()) {
+ log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
+ HashMap<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
+ propMap.put(COLLECTION_PROP, restoreCollectionName);
+ propMap.put(SHARD_ID_PROP, slice.getName());
+
+ if (numNrtReplicas >= 1) {
+ propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
+ } else if (numTlogReplicas >= 1) {
+ propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
+ Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
+ }
+
+ // Get the first node matching the shard to restore in
+ String node;
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ if (Objects.equals(replicaPosition.shard, slice.getName())) {
+ node = replicaPosition.node;
+ propMap.put(CoreAdminParams.NODE, node);
+ replicaPositions.remove(replicaPosition);
+ break;
+ }
+ }
+
+ // add async param
+ if (asyncId != null) {
+ propMap.put(ASYNC, asyncId);
+ }
+ ocmh.addPropertyParams(message, propMap);
+
+ ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
+ }
+
+ //refresh the location copy of collection state
+ restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+ //Copy data from backed up index to each replica
+ for (Slice slice : restoreCollection.getSlices()) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
+ params.set(NAME, "snapshot." + slice.getName());
+ params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
+ params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+
+ ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+ }
+ ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
+
+ //Mark all shards in ACTIVE STATE
+ {
+ HashMap<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+ propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+ for (Slice shard : restoreCollection.getSlices()) {
+ propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
+ }
+ inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+ }
+
+ //refresh the location copy of collection state
+ restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+ if (totalReplicasPerShard > 1) {
+ log.info("Adding replicas to restored collection={}", restoreCollection);
+ for (Slice slice : restoreCollection.getSlices()) {
+
+ //Add the remaining replicas for each shard, considering it's type
+ int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
+
+ // We already created either a NRT or an TLOG replica as leader
+ if (numNrtReplicas > 0) {
+ createdNrtReplicas++;
+ } else if (createdTlogReplicas > 0) {
+ createdTlogReplicas++;
+ }
+
+ for (int i = 1; i < totalReplicasPerShard; i++) {
+ Replica.Type typeToCreate;
+ if (createdNrtReplicas < numNrtReplicas) {
+ createdNrtReplicas++;
+ typeToCreate = Replica.Type.NRT;
+ } else if (createdTlogReplicas < numTlogReplicas) {
+ createdTlogReplicas++;
+ typeToCreate = Replica.Type.TLOG;
+ } else {
+ createdPullReplicas++;
+ typeToCreate = Replica.Type.PULL;
+ assert createdPullReplicas <= numPullReplicas: "Unexpected number of replicas";
+ }
+
+ log.debug("Adding replica for shard={} collection={} of type {} ", slice.getName(), restoreCollection, typeToCreate);
+ HashMap<String, Object> propMap = new HashMap<>();
+ propMap.put(COLLECTION_PROP, restoreCollectionName);
+ propMap.put(SHARD_ID_PROP, slice.getName());
+ propMap.put(REPLICA_TYPE, typeToCreate.name());
+
+ // Get the first node matching the shard to restore in
+ String node;
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ if (Objects.equals(replicaPosition.shard, slice.getName())) {
+ node = replicaPosition.node;
+ propMap.put(CoreAdminParams.NODE, node);
+ replicaPositions.remove(replicaPosition);
+ break;
+ }
+ }
+
+ // add async param
+ if (asyncId != null) {
+ propMap.put(ASYNC, asyncId);
+ }
+ ocmh.addPropertyParams(message, propMap);
+
+ ocmh.addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
+ }
+ }
+ }
+
+ log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
+ } finally {
+ if (sessionWrapper != null) sessionWrapper.release();
+ }
+ }
+
+ private int getInt(ZkNodeProps message, String propertyName, Integer count, int defaultValue) {
+ Integer value = message.getInt(propertyName, count);
+ return value!=null ? value:defaultValue;
+ }
+}