You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by va...@apache.org on 2018/01/16 20:39:50 UTC
[09/15] lucene-solr:branch_7x: SOLR-11817: Move Collections API
classes to it's own package
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
new file mode 100644
index 0000000..eefe903
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+
+public class DeleteReplicaCmd implements Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ deleteReplica(clusterState, message, results,null);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+ throws KeeperException, InterruptedException {
+ log.debug("deleteReplica() : {}", Utils.toJSONString(message));
+ boolean parallel = message.getBool("parallel", false);
+
+ //If a count is specified the strategy needs be different
+ if (message.getStr(COUNT_PROP) != null) {
+ deleteReplicaBasedOnCount(clusterState, message, results, onComplete, parallel);
+ return;
+ }
+
+
+ ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String shard = message.getStr(SHARD_ID_PROP);
+ String replicaName = message.getStr(REPLICA_PROP);
+
+ DocCollection coll = clusterState.getCollection(collectionName);
+ Slice slice = coll.getSlice(shard);
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Invalid shard name : " + shard + " in collection : " + collectionName);
+ }
+
+ deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete, parallel);
+
+ }
+
+
+ /**
+ * Delete replicas based on count for a given collection. If a shard is passed, uses that
+ * else deletes given num replicas across all shards for the given collection.
+ */
+ void deleteReplicaBasedOnCount(ClusterState clusterState,
+ ZkNodeProps message,
+ NamedList results,
+ Runnable onComplete,
+ boolean parallel)
+ throws KeeperException, InterruptedException {
+ ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
+ int count = Integer.parseInt(message.getStr(COUNT_PROP));
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String shard = message.getStr(SHARD_ID_PROP);
+ DocCollection coll = clusterState.getCollection(collectionName);
+ Slice slice = null;
+ //Validate if shard is passed.
+ if (shard != null) {
+ slice = coll.getSlice(shard);
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Invalid shard name : " + shard + " in collection : " + collectionName);
+ }
+ }
+
+ Map<Slice, Set<String>> shardToReplicasMapping = new HashMap<Slice, Set<String>>();
+ if (slice != null) {
+ Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count);
+ shardToReplicasMapping.put(slice,replicasToBeDeleted);
+ } else {
+
+ //If there are many replicas left, remove the rest based on count.
+ Collection<Slice> allSlices = coll.getSlices();
+ for (Slice individualSlice : allSlices) {
+ Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count);
+ shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
+ }
+ }
+
+ for (Slice shardSlice: shardToReplicasMapping.keySet()) {
+ String shardId = shardSlice.getName();
+ Set<String> replicas = shardToReplicasMapping.get(shardSlice);
+ //callDeleteReplica on all replicas
+ for (String replica: replicas) {
+ log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count);
+ deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel);
+ }
+ results.add("shard_id", shardId);
+ results.add("replicas_deleted", replicas);
+ }
+
+ }
+
+
+ /**
+ * Pick replicas to be deleted. Avoid picking the leader.
+ */
+ private Set<String> pickReplicasTobeDeleted(Slice slice, String shard, String collectionName, int count) {
+ validateReplicaAvailability(slice, shard, collectionName, count);
+ Collection<Replica> allReplicas = slice.getReplicas();
+ Set<String> replicasToBeRemoved = new HashSet<String>();
+ Replica leader = slice.getLeader();
+ for (Replica replica: allReplicas) {
+ if (count == 0) {
+ break;
+ }
+ //Try avoiding to pick up the leader to minimize activity on the cluster.
+ if (leader.getCoreName().equals(replica.getCoreName())) {
+ continue;
+ }
+ replicasToBeRemoved.add(replica.getName());
+ count --;
+ }
+ return replicasToBeRemoved;
+ }
+
+ /**
+ * Validate if there is less replicas than requested to remove. Also error out if there is
+ * only one replica available
+ */
+ private void validateReplicaAvailability(Slice slice, String shard, String collectionName, int count) {
+ //If there is a specific shard passed, validate if there any or just 1 replica left
+ if (slice != null) {
+ Collection<Replica> allReplicasForShard = slice.getReplicas();
+ if (allReplicasForShard == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No replicas found in shard/collection: " +
+ shard + "/" + collectionName);
+ }
+
+
+ if (allReplicasForShard.size() == 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There is only one replica available in shard/collection: " +
+ shard + "/" + collectionName + ". Cannot delete that.");
+ }
+
+ if (allReplicasForShard.size() <= count) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are lesser num replicas requested to be deleted than are available in shard/collection : " +
+ shard + "/" + collectionName + " Requested: " + count + " Available: " + allReplicasForShard.size() + ".");
+ }
+ }
+ }
+
+ void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException {
+
+ Replica replica = slice.getReplica(replicaName);
+ if (replica == null) {
+ ArrayList<String> l = new ArrayList<>();
+ for (Replica r : slice.getReplicas())
+ l.add(r.getName());
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " +
+ shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
+ }
+
+ // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
+ // on the command.
+ if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName +
+ " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
+ }
+
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+ String asyncId = message.getStr(ASYNC);
+ AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
+ if (asyncId != null) {
+ requestMap.set(new HashMap<>(1, 1.0f));
+ }
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+ params.add(CoreAdminParams.CORE, core);
+
+ params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
+ params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
+
+ boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+ if (isLive) {
+ ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
+ }
+
+ Callable<Boolean> callable = () -> {
+ try {
+ if (isLive) {
+ ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
+
+ //check if the core unload removed the corenode zk entry
+ if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
+ }
+
+ // try and ensure core info is removed from cluster state
+ ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
+ if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
+ return Boolean.FALSE;
+ } catch (Exception e) {
+ results.add("failure", "Could not complete delete " + e.getMessage());
+ throw e;
+ } finally {
+ if (onComplete != null) onComplete.run();
+ }
+ };
+
+ if (!parallel) {
+ try {
+ if (!callable.call())
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+ } catch (InterruptedException | KeeperException e) {
+ throw e;
+ } catch (Exception ex) {
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
+ }
+
+ } else {
+ ocmh.tpe.submit(callable);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
new file mode 100644
index 0000000..2ef2955
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -0,0 +1,178 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
+
+ public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+ String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+ log.info("Delete shard invoked");
+ Slice slice = clusterState.getCollection(collectionName).getSlice(sliceId);
+ if (slice == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "No shard with name " + sliceId + " exists for collection " + collectionName);
+
+ // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+ // TODO: Add check for range gaps on Slice deletion
+ final Slice.State state = slice.getState();
+ if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
+ || state == Slice.State.CONSTRUCTION) || state == Slice.State.RECOVERY_FAILED) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
+ + ". Only non-active (or custom-hashed) slices can be deleted.");
+ }
+
+ if (state == Slice.State.RECOVERY) {
+ // mark the slice as 'construction' and only then try to delete the cores
+ // see SOLR-9455
+ DistributedQueue inQueue = Overseer.getStateUpdateQueue(ocmh.zkStateReader.getZkClient());
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+ propMap.put(sliceId, Slice.State.CONSTRUCTION.toString());
+ propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(Utils.toJSON(m));
+ }
+
+ String asyncId = message.getStr(ASYNC);
+
+ try {
+ List<ZkNodeProps> replicas = getReplicasForSlice(collectionName, slice);
+ CountDownLatch cleanupLatch = new CountDownLatch(replicas.size());
+ for (ZkNodeProps r : replicas) {
+ final ZkNodeProps replica = r.plus(message.getProperties()).plus("parallel", "true").plus(ASYNC, asyncId);
+ log.info("Deleting replica for collection={} shard={} on node={}", replica.getStr(COLLECTION_PROP), replica.getStr(SHARD_ID_PROP), replica.getStr(CoreAdminParams.NODE));
+ NamedList deleteResult = new NamedList();
+ try {
+ ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, replica, deleteResult, () -> {
+ cleanupLatch.countDown();
+ if (deleteResult.get("failure") != null) {
+ synchronized (results) {
+ results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
+ " on node=%s", replica.getStr(COLLECTION_PROP), replica.getStr(SHARD_ID_PROP), replica.getStr(NODE_NAME_PROP)));
+ }
+ }
+ SimpleOrderedMap success = (SimpleOrderedMap) deleteResult.get("success");
+ if (success != null) {
+ synchronized (results) {
+ results.add("success", success);
+ }
+ }
+ });
+ } catch (KeeperException e) {
+ log.warn("Error deleting replica: " + r, e);
+ cleanupLatch.countDown();
+ } catch (Exception e) {
+ log.warn("Error deleting replica: " + r, e);
+ cleanupLatch.countDown();
+ throw e;
+ }
+ }
+ log.debug("Waiting for delete shard action to complete");
+ cleanupLatch.await(5, TimeUnit.MINUTES);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
+ collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+ // wait for a while until we don't see the shard
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ boolean removed = false;
+ while (!timeout.hasTimedOut()) {
+ timeout.sleep(100);
+ DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ removed = collection.getSlice(sliceId) == null;
+ if (removed) {
+ timeout.sleep(100); // just a bit of time so it's more likely other readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + collectionName + " shard: " + sliceId);
+ }
+
+ log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e);
+ }
+ }
+
+ private List<ZkNodeProps> getReplicasForSlice(String collectionName, Slice slice) {
+ List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+ for (Replica replica : slice.getReplicas()) {
+ ZkNodeProps props = new ZkNodeProps(
+ COLLECTION_PROP, collectionName,
+ SHARD_ID_PROP, slice.getName(),
+ ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
+ ZkStateReader.REPLICA_PROP, replica.getName(),
+ CoreAdminParams.NODE, replica.getNodeName());
+ sourceReplicas.add(props);
+ }
+ return sourceReplicas;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
new file mode 100644
index 0000000..cf0a234
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the functionality of deleting a collection level snapshot.
+ */
+public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String commitName = message.getStr(CoreAdminParams.COMMIT_NAME);
+ String asyncId = message.getStr(ASYNC);
+ Map<String, String> requestMap = new HashMap<>();
+ NamedList shardRequestResults = new NamedList();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
+
+ Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
+ if (!meta.isPresent()) { // Snapshot not found. Nothing to do.
+ return;
+ }
+
+ log.info("Deleting a snapshot for collection={} with commitName={}", collectionName, commitName);
+
+ Set<String> existingCores = new HashSet<>();
+ for (Slice s : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+ for (Replica r : s.getReplicas()) {
+ existingCores.add(r.getCoreName());
+ }
+ }
+
+ Set<String> coresWithSnapshot = new HashSet<>();
+ for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
+ if (existingCores.contains(m.getCoreName())) {
+ coresWithSnapshot.add(m.getCoreName());
+ }
+ }
+
+ log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
+ for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ if (replica.getState() == State.DOWN) {
+ continue; // Since replica is down - no point sending a request.
+ }
+
+ // Note - when a snapshot is found in_progress state - it is the result of overseer
+ // failure while handling the snapshot creation. Since we don't know the exact set of
+ // replicas to contact at this point, we try on all replicas.
+ if (meta.get().getStatus() == SnapshotStatus.InProgress || coresWithSnapshot.contains(replica.getCoreName())) {
+ String coreName = replica.getStr(CORE_NAME_PROP);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.DELETESNAPSHOT.toString());
+ params.set(NAME, slice.getName());
+ params.set(CORE_NAME_PROP, coreName);
+ params.set(CoreAdminParams.COMMIT_NAME, commitName);
+
+ log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName);
+ ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+ }
+ }
+ }
+
+ ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+ NamedList success = (NamedList) shardRequestResults.get("success");
+ List<CoreSnapshotMetaData> replicas = new ArrayList<>();
+ if (success != null) {
+ for ( int i = 0 ; i < success.size() ; i++) {
+ NamedList resp = (NamedList)success.getVal(i);
+ // Unfortunately async processing logic doesn't provide the "core" name automatically.
+ String coreName = (String)resp.get("core");
+ coresWithSnapshot.remove(coreName);
+ }
+ }
+
+ if (!coresWithSnapshot.isEmpty()) { // One or more failures.
+ log.warn("Failed to delete a snapshot for collection {} with commitName = {}. Snapshot could not be deleted for following cores {}",
+ collectionName, commitName, coresWithSnapshot);
+
+ List<CoreSnapshotMetaData> replicasWithSnapshot = new ArrayList<>();
+ for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
+ if (coresWithSnapshot.contains(m.getCoreName())) {
+ replicasWithSnapshot.add(m);
+ }
+ }
+
+ // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
+ // which cores still contain the named snapshot.
+ CollectionSnapshotMetaData newResult = new CollectionSnapshotMetaData(meta.get().getName(), SnapshotStatus.Failed,
+ meta.get().getCreationDate(), replicasWithSnapshot);
+ SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, newResult);
+ log.info("Saved snapshot information for collection={} with commitName={} in Zookeeper as follows", collectionName, commitName,
+ Utils.toJSON(newResult));
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to delete snapshot on cores " + coresWithSnapshot);
+
+ } else {
+ // Delete the ZK path so that we eliminate the references of this snapshot from collection level meta-data.
+ SolrSnapshotManager.deleteCollectionLevelSnapshot(zkClient, collectionName, commitName);
+ log.info("Deleted Zookeeper snapshot metdata for collection={} with commitName={}", collectionName, commitName);
+ log.info("Successfully deleted snapshot for collection={} with commitName={}", collectionName, commitName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java
new file mode 100644
index 0000000..a80fdc0
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.util.Set;
+
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+
+/**
+ * We use this watcher to wait for any eligible replica in a shard to become active so that it can become a leader.
+ */
+public class LeaderRecoveryWatcher implements CollectionStateWatcher {
+ String collectionId;
+ String shardId;
+ String replicaId;
+ String targetCore;
+ SolrCloseableLatch latch;
+
+ /**
+ * Watch for recovery of a replica
+ *
+ * @param collectionId collection name
+ * @param shardId shard id
+ * @param replicaId source replica name (coreNodeName)
+ * @param targetCore specific target core name - if null then any active replica will do
+ * @param latch countdown when recovered
+ */
+ LeaderRecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, SolrCloseableLatch latch) {
+ this.collectionId = collectionId;
+ this.shardId = shardId;
+ this.replicaId = replicaId;
+ this.targetCore = targetCore;
+ this.latch = latch;
+ }
+
+ @Override
+ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ if (collectionState == null) { // collection has been deleted - don't wait
+ latch.countDown();
+ return true;
+ }
+ Slice slice = collectionState.getSlice(shardId);
+ if (slice == null) { // shard has been removed - don't wait
+ latch.countDown();
+ return true;
+ }
+ for (Replica replica : slice.getReplicas()) {
+ // check if another replica exists - doesn't have to be the one we're moving
+ // as long as it's active and can become a leader, in which case we don't have to wait
+ // for recovery of specifically the one that we've just added
+ if (!replica.getName().equals(replicaId)) {
+ if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
+ continue;
+ }
+ // check its state
+ String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+ if (targetCore != null && !targetCore.equals(coreName)) {
+ continue;
+ }
+ if (replica.isActive(liveNodes)) { // recovered - stop waiting
+ latch.countDown();
+ return true;
+ }
+ }
+ }
+ // set the watch again to wait for the new replica to recover
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
new file mode 100644
index 0000000..4edc363
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.RoutingRule;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
+public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
+
+ public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
+ }
+
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ String sourceCollectionName = message.getStr("collection");
+ String splitKey = message.getStr("split.key");
+ String targetCollectionName = message.getStr("target.collection");
+ int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
+
+ DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
+ if (sourceCollection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
+ }
+ DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
+ if (targetCollection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
+ }
+ if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
+ }
+ if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
+ }
+ CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+ CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
+ Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
+ if (sourceSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
+ }
+ Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
+ if (targetSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
+ }
+
+ String asyncId = null;
+ if (message.containsKey(ASYNC) && message.get(ASYNC) != null)
+ asyncId = message.getStr(ASYNC);
+
+ for (Slice sourceSlice : sourceSlices) {
+ for (Slice targetSlice : targetSlices) {
+ log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
+ migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
+ timeout, results, asyncId, message);
+ }
+ }
+ }
+
+ private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+ DocCollection targetCollection, Slice targetSlice,
+ String splitKey, int timeout,
+ NamedList results, String asyncId, ZkNodeProps message) throws Exception {
+ String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ if (clusterState.hasCollection(tempSourceCollectionName)) {
+ log.info("Deleting temporary collection: " + tempSourceCollectionName);
+ Map<String, Object> props = makeMap(
+ Overseer.QUEUE_OPERATION, DELETE.toLower(),
+ NAME, tempSourceCollectionName);
+
+ try {
+ ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+ clusterState = zkStateReader.getClusterState();
+ } catch (Exception e) {
+ log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
+ }
+ }
+
+ CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+ DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
+
+ ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
+ // intersect source range, keyHashRange and target range
+ // this is the range that has to be split from source and transferred to target
+ DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange));
+ if (splitRange == null) {
+ log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
+ return;
+ }
+ log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
+
+ Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+ // For tracking async calls.
+ Map<String, String> requestMap = new HashMap<>();
+
+ log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+ + targetLeader.getStr("core") + " to buffer updates");
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
+ params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+ ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
+
+ ZkNodeProps m = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
+ COLLECTION_PROP, sourceCollection.getName(),
+ SHARD_ID_PROP, sourceSlice.getName(),
+ "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
+ "range", splitRange.toString(),
+ "targetCollection", targetCollection.getName(),
+ "expireAt", RoutingRule.makeExpiryAt(timeout));
+ log.info("Adding routing rule: " + m);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+ // wait for a while until we see the new rule
+ log.info("Waiting to see routing rule updated in clusterstate");
+ TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS, timeSource);
+ boolean added = false;
+ while (!waitUntil.hasTimedOut()) {
+ waitUntil.sleep(100);
+ sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
+ sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
+ Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
+ if (rules != null) {
+ RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
+ if (rule != null && rule.getRouteRanges().contains(splitRange)) {
+ added = true;
+ break;
+ }
+ }
+ }
+ if (!added) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
+ }
+
+ log.info("Routing rule added successfully");
+
+ // Create temp core on source shard
+ Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
+
+ // create a temporary collection with just one node on the shard leader
+ String configName = zkStateReader.readConfigName(sourceCollection.getName());
+ Map<String, Object> props = makeMap(
+ Overseer.QUEUE_OPERATION, CREATE.toLower(),
+ NAME, tempSourceCollectionName,
+ NRT_REPLICAS, 1,
+ OverseerCollectionMessageHandler.NUM_SLICES, 1,
+ OverseerCollectionMessageHandler.COLL_CONF, configName,
+ OverseerCollectionMessageHandler.CREATE_NODE_SET, sourceLeader.getNodeName());
+ if (asyncId != null) {
+ String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+ props.put(ASYNC, internalAsyncId);
+ }
+
+ log.info("Creating temporary collection: " + props);
+ ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results);
+ // refresh cluster state
+ clusterState = zkStateReader.getClusterState();
+ Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
+ Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
+
+ String tempCollectionReplica1 = tempSourceLeader.getCoreName();
+ String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+ sourceLeader.getNodeName(), tempCollectionReplica1);
+ // wait for the replicas to be seen as active on temp source leader
+ log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
+ CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(tempCollectionReplica1);
+ cmd.setNodeName(sourceLeader.getNodeName());
+ cmd.setCoreNodeName(coreNodeName);
+ cmd.setState(Replica.State.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ // we don't want this to happen asynchronously
+ ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
+ " or timed out waiting for it to come up", asyncId, requestMap);
+
+ log.info("Asking source leader to split index");
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+ params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
+ params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
+ params.set(CoreAdminParams.RANGES, splitRange.toString());
+ params.set("split.key", splitKey);
+
+ String tempNodeName = sourceLeader.getNodeName();
+
+ ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
+
+ log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
+ tempSourceCollectionName, targetLeader.getNodeName());
+ String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
+ zkStateReader.getClusterState().getCollection(tempSourceCollectionName), tempSourceSlice.getName(), Replica.Type.NRT);
+ props = new HashMap<>();
+ props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+ props.put(COLLECTION_PROP, tempSourceCollectionName);
+ props.put(SHARD_ID_PROP, tempSourceSlice.getName());
+ props.put("node", targetLeader.getNodeName());
+ props.put(CoreAdminParams.NAME, tempCollectionReplica2);
+ // copy over property params:
+ for (String key : message.keySet()) {
+ if (key.startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) {
+ props.put(key, message.getStr(key));
+ }
+ }
+ // add async param
+ if (asyncId != null) {
+ props.put(ASYNC, asyncId);
+ }
+ ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
+ "temporary collection in target leader node.", asyncId, requestMap);
+
+ coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+ targetLeader.getNodeName(), tempCollectionReplica2);
+ // wait for the replicas to be seen as active on temp source leader
+ log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
+ cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(tempSourceLeader.getStr("core"));
+ cmd.setNodeName(targetLeader.getNodeName());
+ cmd.setCoreNodeName(coreNodeName);
+ cmd.setState(Replica.State.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ params = new ModifiableSolrParams(cmd.getParams());
+
+ ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
+ " replica or timed out waiting for them to come up", asyncId, requestMap);
+
+ log.info("Successfully created replica of temp source collection on target leader node");
+
+ log.info("Requesting merge of temp source collection replica to target leader");
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString());
+ params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
+ params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+ ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+ String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
+ + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
+ ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
+
+ log.info("Asking target leader to apply buffered updates");
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+ params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+ ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
+ asyncId, requestMap);
+
+ try {
+ log.info("Deleting temporary collection: " + tempSourceCollectionName);
+ props = makeMap(
+ Overseer.QUEUE_OPERATION, DELETE.toLower(),
+ NAME, tempSourceCollectionName);
+ ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+ } catch (Exception e) {
+ log.error("Unable to delete temporary collection: " + tempSourceCollectionName
+ + ". Please remove it manually", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
new file mode 100644
index 0000000..f9392b5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
+public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
+
+ public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
+ }
+
+ private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ log.debug("moveReplica() : {}", Utils.toJSONString(message));
+ ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE);
+ String collection = message.getStr(COLLECTION_PROP);
+ String targetNode = message.getStr(CollectionParams.TARGET_NODE);
+ boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+ boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true);
+ int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
+
+ String async = message.getStr(ASYNC);
+
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ if (!clusterState.getLiveNodes().contains(targetNode)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes());
+ }
+ Replica replica = null;
+ if (message.containsKey(REPLICA_PROP)) {
+ String replicaName = message.getStr(REPLICA_PROP);
+ replica = coll.getReplica(replicaName);
+ if (replica == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " replica: " + replicaName + " does not exist");
+ }
+ } else {
+ String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE));
+ if (sourceNode == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE +
+ " or '" + CollectionParams.FROM_NODE + "' is a required param");
+ }
+ String shardId = message.getStr(SHARD_ID_PROP);
+ if (shardId == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
+ }
+ Slice slice = clusterState.getCollection(collection).getSlice(shardId);
+ List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
+ Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
+ // this picks up a single random replica from the sourceNode
+ for (Replica r : slice.getReplicas()) {
+ if (r.getNodeName().equals(sourceNode)) {
+ replica = r;
+ }
+ }
+ if (replica == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
+ }
+ }
+
+ log.info("Replica will be moved to node {}: {}", targetNode, replica);
+ Slice slice = null;
+ for (Slice s : coll.getSlices()) {
+ if (s.getReplicas().contains(replica)) {
+ slice = s;
+ }
+ }
+ assert slice != null;
+ Object dataDir = replica.get("dataDir");
+ boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null;
+
+ if (isSharedFS && inPlaceMove) {
+ log.debug("-- moveHdfsReplica");
+ moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState);
+ } else {
+ log.debug("-- moveNormalReplica (inPlaceMove=" + inPlaceMove + ", isSharedFS=" + isSharedFS);
+ moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState);
+ }
+ }
+
+ private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
+ DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
+ String skipCreateReplicaInClusterState = "true";
+ if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
+ skipCreateReplicaInClusterState = "false";
+ ZkNodeProps removeReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ REPLICA_PROP, replica.getName()
+ );
+ removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
+ removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
+ if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+ NamedList deleteResult = new NamedList();
+ ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
+ if (deleteResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
+ coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
+ log.warn(errorString);
+ results.add("failure", errorString);
+ return;
+ }
+
+ TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource);
+ while (!timeOut.hasTimedOut()) {
+ coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName());
+ if (coll.getReplica(replica.getName()) != null) {
+ timeOut.sleep(100);
+ } else {
+ break;
+ }
+ }
+ if (timeOut.hasTimedOut()) {
+ results.add("failure", "Still see deleted replica in clusterstate!");
+ return;
+ }
+
+ }
+
+ String ulogDir = replica.getStr(CoreAdminParams.ULOG_DIR);
+ ZkNodeProps addReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ CoreAdminParams.NODE, targetNode,
+ CoreAdminParams.CORE_NODE_NAME, replica.getName(),
+ CoreAdminParams.NAME, replica.getCoreName(),
+ WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState),
+ SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState,
+ CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)),
+ CoreAdminParams.DATA_DIR, dataDir);
+ if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
+ NamedList addResult = new NamedList();
+ try {
+ ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null);
+ } catch (Exception e) {
+ // fatal error - try rolling back
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+ results.add("failure", errorString);
+ log.warn("Error adding replica " + addReplicasProps + " - trying to roll back...", e);
+ addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
+ NamedList rollback = new NamedList();
+ ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
+ if (rollback.get("failure") != null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ + ", collection may be inconsistent: " + rollback.get("failure"));
+ }
+ return;
+ }
+ if (addResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+ log.warn(errorString);
+ results.add("failure", errorString);
+ log.debug("--- trying to roll back...");
+ // try to roll back
+ addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
+ NamedList rollback = new NamedList();
+ try {
+ ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ + ", collection may be inconsistent!", e);
+ }
+ if (rollback.get("failure") != null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+ + ", collection may be inconsistent! Failure: " + rollback.get("failure"));
+ }
+ return;
+ } else {
+ String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
+ "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), replica.getCoreName(), targetNode);
+ results.add("success", successString);
+ }
+ }
+
+ private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
+ DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
+ String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
+ ZkNodeProps addReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ CoreAdminParams.NODE, targetNode,
+ CoreAdminParams.NAME, newCoreName);
+ if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
+ NamedList addResult = new NamedList();
+ SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
+ ActiveReplicaWatcher watcher = null;
+ ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
+ log.debug("props " + props);
+ if (replica.equals(slice.getLeader()) || waitForFinalState) {
+ watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);
+ log.debug("-- registered watcher " + watcher);
+ ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
+ }
+ if (addResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+ log.warn(errorString);
+ results.add("failure", errorString);
+ if (watcher != null) { // unregister
+ ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
+ }
+ return;
+ }
+ // wait for the other replica to be active if the source replica was a leader
+ if (watcher != null) {
+ try {
+ log.debug("Waiting for leader's replica to recover.");
+ if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+ String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
+ " on node=%s", coll.getName(), slice.getName(), targetNode);
+ log.warn(errorString);
+ results.add("failure", errorString);
+ return;
+ } else {
+ log.debug("Replica " + watcher.getActiveReplicas() + " is active - deleting the source...");
+ }
+ } finally {
+ ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
+ }
+ }
+
+ ZkNodeProps removeReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ REPLICA_PROP, replica.getName());
+ if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
+ NamedList deleteResult = new NamedList();
+ ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
+ if (deleteResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
+ coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
+ log.warn(errorString);
+ results.add("failure", errorString);
+ } else {
+ String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
+ "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
+ results.add("success", successString);
+ }
+ }
+}