You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/08/19 05:44:47 UTC
[3/3] lucene-solr:master: SOLR-9421: Refactored out
OverseerCollectionMessageHandler to smaller classes
SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/bbd1efe5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/bbd1efe5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/bbd1efe5
Branch: refs/heads/master
Commit: bbd1efe5d8f547e0503d2a39abbfd3849019c77f
Parents: 9e1a25e
Author: Noble Paul <no...@apache.org>
Authored: Fri Aug 19 11:12:29 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Fri Aug 19 11:14:25 2016 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/AddReplicaCmd.java | 192 ++
.../java/org/apache/solr/cloud/BackupCmd.java | 132 ++
.../org/apache/solr/cloud/CreateAliasCmd.java | 101 +
.../apache/solr/cloud/CreateCollectionCmd.java | 291 +++
.../org/apache/solr/cloud/CreateShardCmd.java | 120 ++
.../org/apache/solr/cloud/DeleteAliasCmd.java | 95 +
.../apache/solr/cloud/DeleteCollectionCmd.java | 121 ++
.../org/apache/solr/cloud/DeleteNodeCmd.java | 3 +-
.../org/apache/solr/cloud/DeleteReplicaCmd.java | 155 ++
.../org/apache/solr/cloud/DeleteShardCmd.java | 126 ++
.../java/org/apache/solr/cloud/MigrateCmd.java | 333 +++
.../cloud/OverseerCollectionMessageHandler.java | 2012 +-----------------
.../org/apache/solr/cloud/OverseerRoleCmd.java | 102 +
.../apache/solr/cloud/OverseerStatusCmd.java | 122 ++
.../java/org/apache/solr/cloud/RestoreCmd.java | 243 +++
.../org/apache/solr/cloud/SplitShardCmd.java | 458 ++++
17 files changed, 2695 insertions(+), 1913 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7458f46..fccfa43 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -282,6 +282,8 @@ Other Changes
* SOLR-9404: Refactor move/renames in JSON FacetProcessor and FacetFieldProcessor. (David Smiley)
+* SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes (noble)
+
================== 6.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
new file mode 100644
index 0000000..6bb3350
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public AddReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ addReplica(ocmh.zkStateReader.getClusterState(), message, results, null);
+ }
+
+ ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+ throws KeeperException, InterruptedException {
+ log.info("addReplica() : {}", Utils.toJSONString(message));
+ String collection = message.getStr(COLLECTION_PROP);
+ String node = message.getStr(CoreAdminParams.NODE);
+ String shard = message.getStr(SHARD_ID_PROP);
+ String coreName = message.getStr(CoreAdminParams.NAME);
+ boolean parallel = message.getBool("parallel", false);
+ if (StringUtils.isBlank(coreName)) {
+ coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
+ }
+
+ final String asyncId = message.getStr(ASYNC);
+
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ if (coll.getSlice(shard) == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " shard: " + shard + " does not exist");
+ }
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+
+ // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
+ if (!skipCreateReplicaInClusterState) {
+ node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
+ ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;
+ }
+ log.info("Node Identified {} for creating new replica", node);
+
+ if (!clusterState.liveNodesContain(node)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+ }
+ if (coreName == null) {
+ coreName = Assign.buildCoreName(coll, shard);
+ } else if (!skipCreateReplicaInClusterState) {
+ //Validate that the core name is unique in that collection
+ for (Slice slice : coll.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ String replicaCoreName = replica.getStr(CORE_NAME_PROP);
+ if (coreName.equals(replicaCoreName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
+ " for this collection");
+ }
+ }
+ }
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ if (!Overseer.isLegacy(zkStateReader)) {
+ if (!skipCreateReplicaInClusterState) {
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.SHARD_ID_PROP, shard,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+ ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
+ ZkStateReader.NODE_NAME_PROP, node);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ }
+ params.set(CoreAdminParams.CORE_NODE_NAME,
+ ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
+ }
+
+ String configName = zkStateReader.readConfigName(collection);
+ String routeKey = message.getStr(ShardParams._ROUTE_);
+ String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
+ String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
+
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+ params.set(CoreAdminParams.NAME, coreName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collection);
+ if (shard != null) {
+ params.set(CoreAdminParams.SHARD, shard);
+ } else if (routeKey != null) {
+ Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
+ if (slices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
+ } else {
+ params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
+ }
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
+ }
+ if (dataDir != null) {
+ params.set(CoreAdminParams.DATA_DIR, dataDir);
+ }
+ if (instanceDir != null) {
+ params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
+ }
+ ocmh.addPropertyParams(message, params);
+
+ // For tracking async calls.
+ Map<String,String> requestMap = new HashMap<>();
+ ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
+
+ final String fnode = node;
+ final String fcoreName = coreName;
+
+ Runnable runnable = () -> {
+ ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
+ ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
+ if (onComplete != null) onComplete.run();
+ };
+
+ if (!parallel) {
+ runnable.run();
+ } else {
+ ocmh.tpe.submit(runnable);
+ }
+
+
+ return new ZkNodeProps(
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.SHARD_ID_PROP, shard,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.NODE_NAME_PROP, node
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
new file mode 100644
index 0000000..679cb07
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public BackupCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String backupName = message.getStr(NAME);
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ String asyncId = message.getStr(ASYNC);
+ String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+ String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
+
+ Map<String, String> requestMap = new HashMap<>();
+ Instant startTime = Instant.now();
+
+ CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
+ BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+ BackupManager backupMgr = new BackupManager(repository, ocmh.zkStateReader, collectionName);
+
+ // Backup location
+ URI backupPath = repository.createURI(location, backupName);
+
+ //Validating if the directory already exists.
+ if (repository.exists(backupPath)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
+ }
+
+ // Create a directory to store backup details.
+ repository.createDirectory(backupPath);
+
+ log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
+ backupPath);
+
+ for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
+ Replica replica = slice.getLeader();
+
+ String coreName = replica.getStr(CORE_NAME_PROP);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString());
+ params.set(NAME, slice.getName());
+ params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+ params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath()); // note: index dir will be here then the "snapshot." + slice name
+ params.set(CORE_NAME_PROP, coreName);
+
+ ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+ log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
+ }
+ log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
+
+ ocmh.processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap);
+
+ log.info("Starting to backup ZK data for backupName={}", backupName);
+
+ //Download the configs
+ String configName = ocmh.zkStateReader.readConfigName(collectionName);
+ backupMgr.downloadConfigDir(location, backupName, configName);
+
+ //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
+ //Since we don't want to distinguish we extract the state and back it up as a separate json
+ DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+ backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
+
+ Properties properties = new Properties();
+
+ properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
+ properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
+ properties.put(COLL_CONF, configName);
+ properties.put(BackupManager.START_TIME_PROP, startTime.toString());
+ //TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
+ //if they are not the same then we can throw an error or have an 'overwriteConfig' flag
+ //TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
+
+ backupMgr.writeBackupProperties(location, backupName, properties);
+
+ log.info("Completed backing up ZK data for backupName={}", backupName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
new file mode 100644
index 0000000..b966ebd
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
@@ -0,0 +1,101 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+
+public class CreateAliasCmd implements Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results)
+ throws Exception {
+ String aliasName = message.getStr(NAME);
+ String collections = message.getStr("collections");
+
+ Map<String, Map<String, String>> newAliasesMap = new HashMap<>();
+ Map<String, String> newCollectionAliasesMap = new HashMap<>();
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ Map<String, String> prevColAliases = zkStateReader.getAliases().getCollectionAliasMap();
+ if (prevColAliases != null) {
+ newCollectionAliasesMap.putAll(prevColAliases);
+ }
+ newCollectionAliasesMap.put(aliasName, collections);
+ newAliasesMap.put("collection", newCollectionAliasesMap);
+ Aliases newAliases = new Aliases(newAliasesMap);
+ byte[] jsonBytes = null;
+ if (newAliases.collectionAliasSize() > 0) { // only sub map right now
+ jsonBytes = Utils.toJSON(newAliases.getAliasMap());
+ }
+ try {
+ zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
+
+ checkForAlias(aliasName, collections);
+ // some fudge for other nodes
+ Thread.sleep(100);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ log.warn("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ private void checkForAlias(String name, String value) {
+
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ boolean success = false;
+ Aliases aliases;
+ while (!timeout.hasTimedOut()) {
+ aliases = ocmh.zkStateReader.getAliases();
+ String collections = aliases.getCollectionAlias(name);
+ if (collections != null && collections.equals(value)) {
+ success = true;
+ break;
+ }
+ }
+ if (!success) {
+ log.warn("Timeout waiting to be notified of Alias change...");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
new file mode 100644
index 0000000..7f28600
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.StrUtils.formatString;
+
+public class CreateCollectionCmd implements Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ final String collectionName = message.getStr(NAME);
+ log.info("Create collection {}", collectionName);
+ if (clusterState.hasCollection(collectionName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
+ }
+
+ String configName = getConfigName(collectionName, message);
+ if (configName == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
+ }
+
+ ocmh.validateConfigOrThrowSolrException(configName);
+
+
+ try {
+ // look at the replication factor and see if it matches reality
+ // if it does not, find best nodes to create more cores
+
+ int repFactor = message.getInt(REPLICATION_FACTOR, 1);
+
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ final String async = message.getStr(ASYNC);
+
+ Integer numSlices = message.getInt(NUM_SLICES, null);
+ String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+ List<String> shardNames = new ArrayList<>();
+ if(ImplicitDocRouter.NAME.equals(router)){
+ ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+ numSlices = shardNames.size();
+ } else {
+ if (numSlices == null ) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
+ }
+ ClusterStateMutator.getShardNames(numSlices, shardNames);
+ }
+
+ int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+
+ if (repFactor <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
+ }
+
+ if (numSlices <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
+ }
+
+ // we need to look at every node and see how many cores it serves
+ // add our new cores to existing nodes serving the least number of cores
+ // but (for now) require that each core goes on a distinct node.
+
+ final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
+ Map<ReplicaAssigner.Position, String> positionVsNodes;
+ if (nodeList.isEmpty()) {
+ log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
+
+ positionVsNodes = new HashMap<>();
+ } else {
+ if (repFactor > nodeList.size()) {
+ log.warn("Specified "
+ + REPLICATION_FACTOR
+ + " of "
+ + repFactor
+ + " on collection "
+ + collectionName
+ + " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+ + nodeList.size()
+ + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
+ }
+
+ int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
+ int requestedShardsToCreate = numSlices * repFactor;
+ if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+ + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
+ + ". This allows a maximum of " + maxShardsAllowedToCreate
+ + " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ + " and value of " + REPLICATION_FACTOR + " is " + repFactor
+ + ". This requires " + requestedShardsToCreate
+ + " shards to be created (higher than the allowed number)");
+ }
+
+ positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
+ }
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
+
+ ocmh.createConfNode(configName, collectionName, isLegacyCloud);
+
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+ // wait for a while until we don't see the collection
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS);
+ boolean created = false;
+ while (! waitUntil.hasTimedOut()) {
+ Thread.sleep(100);
+ created = zkStateReader.getClusterState().hasCollection(collectionName);
+ if(created) break;
+ }
+ if (!created)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+ if (nodeList.isEmpty()) {
+ log.info("Finished create command for collection: {}", collectionName);
+ return;
+ }
+
+ // For tracking async calls.
+ Map<String, String> requestMap = new HashMap<>();
+
+
+ log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
+ collectionName, shardNames, repFactor));
+ Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+ for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
+ ReplicaAssigner.Position position = e.getKey();
+ String nodeName = e.getValue();
+ String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
+ log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
+ , coreName, position.shard, collectionName, nodeName));
+
+
+ String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
+ //in the new mode, create the replica in clusterstate prior to creating the core.
+ // Otherwise the core creation fails
+ if (!isLegacyCloud) {
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+ ZkStateReader.COLLECTION_PROP, collectionName,
+ ZkStateReader.SHARD_ID_PROP, position.shard,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+ ZkStateReader.BASE_URL_PROP, baseUrl);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ }
+
+ // Need to create new params for each request
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, coreName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collectionName);
+ params.set(CoreAdminParams.SHARD, position.shard);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+ if (async != null) {
+ String coreAdminAsyncId = async + Math.abs(System.nanoTime());
+ params.add(ASYNC, coreAdminAsyncId);
+ requestMap.put(nodeName, coreAdminAsyncId);
+ }
+ ocmh.addPropertyParams(message, params);
+
+ ShardRequest sreq = new ShardRequest();
+ sreq.nodeName = nodeName;
+ params.set("qt", ocmh.adminPath);
+ sreq.purpose = 1;
+ sreq.shards = new String[]{baseUrl};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ if (isLegacyCloud) {
+ shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+ } else {
+ coresToCreate.put(coreName, sreq);
+ }
+ }
+
+ if(!isLegacyCloud) {
+ // wait for all replica entries to be created
+ Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+ for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
+ ShardRequest sreq = e.getValue();
+ sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+ shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+ }
+ }
+
+ ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
+ if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
+ // Let's cleanup as we hit an exception
+ // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
+ // element, which may be interpreted by the user as a positive ack
+ ocmh.cleanupCollection(collectionName, new NamedList());
+ log.info("Cleaned up artifacts for failed create collection for [" + collectionName + "]");
+ } else {
+ log.debug("Finished create command on all shards for collection: "
+ + collectionName);
+ }
+ } catch (SolrException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
+ }
+ }
+ String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
+ String configName = message.getStr(COLL_CONF);
+
+ if (configName == null) {
+ // if there is only one conf, use that
+ List<String> configNames = null;
+ try {
+ configNames = ocmh.zkStateReader.getZkClient().getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
+ if (configNames != null && configNames.size() == 1) {
+ configName = configNames.get(0);
+ // no config set named, but there is only 1 - use it
+ log.info("Only one config set found in zk - using it:" + configName);
+ } else if (configNames.contains(coll)) {
+ configName = coll;
+ }
+ } catch (KeeperException.NoNodeException e) {
+
+ }
+ }
+ return configName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
new file mode 100644
index 0000000..3d5aa41
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class CreateShardCmd implements Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public CreateShardCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String sliceName = message.getStr(SHARD_ID_PROP);
+
+ log.info("Create shard invoked: {}", message);
+ if (collectionName == null || sliceName == null)
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
+ int numSlices = 1;
+
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+ String createNodeSetStr = message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET);
+ List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor,
+ createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer());
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+ // wait for a while until we see the shard
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ boolean created = false;
+ while (!timeout.hasTimedOut()) {
+ Thread.sleep(100);
+ created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
+ if (created) break;
+ }
+ if (!created)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
+
+ String configName = message.getStr(COLL_CONF);
+
+ String async = message.getStr(ASYNC);
+ Map<String, String> requestMap = null;
+ if (async != null) {
+ requestMap = new HashMap<>(repFactor, 1.0f);
+ }
+
+ for (int j = 1; j <= repFactor; j++) {
+ String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
+ String shardName = collectionName + "_" + sliceName + "_replica" + j;
+ log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
+ + " on " + nodeName);
+
+ // Need to create new params for each request
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+ params.set(CoreAdminParams.NAME, shardName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collectionName);
+ params.set(CoreAdminParams.SHARD, sliceName);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+ ocmh.addPropertyParams(message, params);
+
+ ocmh.sendShardRequest(nodeName, params, shardHandler, async, requestMap);
+ }
+
+ ocmh.processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap, Collections.emptySet());
+
+ log.info("Finished create command on all shards for collection: " + collectionName);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java
new file mode 100644
index 0000000..7b1993c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java
@@ -0,0 +1,95 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteAliasCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ String aliasName = message.getStr(NAME);
+
+ Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
+ Map<String,String> newCollectionAliasesMap = new HashMap<>();
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ newCollectionAliasesMap.putAll(zkStateReader.getAliases().getCollectionAliasMap());
+ newCollectionAliasesMap.remove(aliasName);
+ newAliasesMap.put("collection", newCollectionAliasesMap);
+ Aliases newAliases = new Aliases(newAliasesMap);
+ byte[] jsonBytes = null;
+ if (newAliases.collectionAliasSize() > 0) { // only sub map right now
+ jsonBytes = Utils.toJSON(newAliases.getAliasMap());
+ }
+ try {
+ zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
+ jsonBytes, true);
+ checkForAliasAbsence(aliasName);
+ // some fudge for other nodes
+ Thread.sleep(100);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ log.warn("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+
+ }
+ private void checkForAliasAbsence(String name) {
+
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ boolean success = false;
+ Aliases aliases = null;
+ while (! timeout.hasTimedOut()) {
+ aliases = ocmh.zkStateReader.getAliases();
+ String collections = aliases.getCollectionAlias(name);
+ if (collections == null) {
+ success = true;
+ break;
+ }
+ }
+ if (!success) {
+ log.warn("Timeout waiting to be notified of Alias change...");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
new file mode 100644
index 0000000..4c5ae00
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
@@ -0,0 +1,121 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.NonExistentCoreException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ final String collection = message.getStr(NAME);
+ try {
+ if (zkStateReader.getClusterState().getCollectionOrNull(collection) == null) {
+ if (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+ // if the collection is not in the clusterstate, but is listed in zk, do nothing, it will just
+ // be removed in the finally - we cannot continue, because the below code will error if the collection
+ // is not in the clusterstate
+ return;
+ }
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+ params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+
+ String asyncId = message.getStr(ASYNC);
+ Map<String, String> requestMap = null;
+ if (asyncId != null) {
+ requestMap = new HashMap<>();
+ }
+
+ Set<String> okayExceptions = new HashSet<>(1);
+ okayExceptions.add(NonExistentCoreException.class.getName());
+
+ ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+ // wait for a while until we don't see the collection
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ boolean removed = false;
+ while (! timeout.hasTimedOut()) {
+ Thread.sleep(100);
+ removed = !zkStateReader.getClusterState().hasCollection(collection);
+ if (removed) {
+ Thread.sleep(500); // just a bit of time so it's more likely other
+ // readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + collection);
+ }
+
+ } finally {
+
+ try {
+ if (zkStateReader.getZkClient().exists(
+ ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+ zkStateReader.getZkClient().clean(
+ ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+ }
+ } catch (InterruptedException e) {
+ SolrException.log(log, "Cleaning up collection in zk was interrupted:"
+ + collection, e);
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ SolrException.log(log, "Problem cleaning up collection in zk:"
+ + collection, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
index 3e60090..b3c5055 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -64,7 +65,7 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Deleting replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), node);
NamedList deleteResult = new NamedList();
try {
- ocmh.deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
+ ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
cleanupLatch.countDown();
if (deleteResult.get("failure") != null) {
synchronized (results) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
new file mode 100644
index 0000000..6f5fc62
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+
+public class DeleteReplicaCmd implements Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ deleteReplica(clusterState, message, results,null);
+ }
+
+ @SuppressWarnings("unchecked")
+ void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+ throws KeeperException, InterruptedException {
+ ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String shard = message.getStr(SHARD_ID_PROP);
+ String replicaName = message.getStr(REPLICA_PROP);
+ boolean parallel = message.getBool("parallel", false);
+
+ DocCollection coll = clusterState.getCollection(collectionName);
+ Slice slice = coll.getSlice(shard);
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Invalid shard name : " + shard + " in collection : " + collectionName);
+ }
+ Replica replica = slice.getReplica(replicaName);
+ if (replica == null) {
+ ArrayList<String> l = new ArrayList<>();
+ for (Replica r : slice.getReplicas())
+ l.add(r.getName());
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
+ + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
+ }
+
+ // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
+ // on the command.
+ if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
+ + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
+ }
+
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+ String asyncId = message.getStr(ASYNC);
+ AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
+ if (asyncId != null) {
+ requestMap.set(new HashMap<>(1, 1.0f));
+ }
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+ params.add(CoreAdminParams.CORE, core);
+
+ params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
+ params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
+
+ boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+ if (isLive) {
+ ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
+ }
+
+ Callable<Boolean> callable = () -> {
+ try {
+ if (isLive) {
+ ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
+
+ //check if the core unload removed the corenode zk entry
+ if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
+ }
+
+ // try and ensure core info is removed from cluster state
+ ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
+ if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
+ return Boolean.FALSE;
+ } catch (Exception e) {
+ results.add("failure", "Could not complete delete " + e.getMessage());
+ throw e;
+ } finally {
+ if (onComplete != null) onComplete.run();
+ }
+ };
+
+ if (!parallel) {
+ try {
+ if (!callable.call())
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+ } catch (InterruptedException | KeeperException e) {
+ throw e;
+ } catch (Exception ex) {
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
+ }
+
+ } else {
+ ocmh.tpe.submit(callable);
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
new file mode 100644
index 0000000..f2ae5ca
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
@@ -0,0 +1,126 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class DeleteShardCmd implements Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+ String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+ log.info("Delete shard invoked");
+ Slice slice = clusterState.getSlice(collectionName, sliceId);
+
+ if (slice == null) {
+ if (clusterState.hasCollection(collectionName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "No shard with name " + sliceId + " exists for collection " + collectionName);
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
+ }
+ }
+ // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+ // TODO: Add check for range gaps on Slice deletion
+ final Slice.State state = slice.getState();
+ if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
+ || state == Slice.State.CONSTRUCTION)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
+ + ". Only non-active (or custom-hashed) slices can be deleted.");
+ }
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+ String asyncId = message.getStr(ASYNC);
+ Map<String, String> requestMap = null;
+ if (asyncId != null) {
+ requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f);
+ }
+
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
+ params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
+
+ ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+
+ ocmh.processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap, Collections.emptySet());
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
+ collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+ // wait for a while until we don't see the shard
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+ boolean removed = false;
+ while (! timeout.hasTimedOut()) {
+ Thread.sleep(100);
+ DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ removed = collection.getSlice(sliceId) == null;
+ if (removed) {
+ Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + collectionName + " shard: " + sliceId);
+ }
+
+ log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
+
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
new file mode 100644
index 0000000..7b1ad2c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.RoutingRule;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
+public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ String sourceCollectionName = message.getStr("collection");
+ String splitKey = message.getStr("split.key");
+ String targetCollectionName = message.getStr("target.collection");
+ int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
+
+ DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
+ if (sourceCollection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
+ }
+ DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
+ if (targetCollection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
+ }
+ if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
+ }
+ if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
+ }
+ CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+ CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
+ Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
+ if (sourceSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
+ }
+ Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
+ if (targetSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
+ }
+
+ String asyncId = null;
+ if (message.containsKey(ASYNC) && message.get(ASYNC) != null)
+ asyncId = message.getStr(ASYNC);
+
+ for (Slice sourceSlice : sourceSlices) {
+ for (Slice targetSlice : targetSlices) {
+ log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
+ migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
+ timeout, results, asyncId, message);
+ }
+ }
+ }
+
+ private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+ DocCollection targetCollection, Slice targetSlice,
+ String splitKey, int timeout,
+ NamedList results, String asyncId, ZkNodeProps message) throws Exception {
+ String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ if (clusterState.hasCollection(tempSourceCollectionName)) {
+ log.info("Deleting temporary collection: " + tempSourceCollectionName);
+ Map<String, Object> props = makeMap(
+ Overseer.QUEUE_OPERATION, DELETE.toLower(),
+ NAME, tempSourceCollectionName);
+
+ try {
+ ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+ clusterState = zkStateReader.getClusterState();
+ } catch (Exception e) {
+ log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
+ }
+ }
+
+ CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+ DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
+
+ ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
+ // intersect source range, keyHashRange and target range
+ // this is the range that has to be split from source and transferred to target
+ DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange));
+ if (splitRange == null) {
+ log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
+ return;
+ }
+ log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
+
+ Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+ // For tracking async calls.
+ Map<String, String> requestMap = new HashMap<>();
+
+ log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+ + targetLeader.getStr("core") + " to buffer updates");
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
+ params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+ ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
+
+ ZkNodeProps m = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
+ COLLECTION_PROP, sourceCollection.getName(),
+ SHARD_ID_PROP, sourceSlice.getName(),
+ "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
+ "range", splitRange.toString(),
+ "targetCollection", targetCollection.getName(),
+ "expireAt", RoutingRule.makeExpiryAt(timeout));
+ log.info("Adding routing rule: " + m);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+ // wait for a while until we see the new rule
+ log.info("Waiting to see routing rule updated in clusterstate");
+ TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS);
+ boolean added = false;
+ while (!waitUntil.hasTimedOut()) {
+ Thread.sleep(100);
+ sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
+ sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
+ Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
+ if (rules != null) {
+ RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
+ if (rule != null && rule.getRouteRanges().contains(splitRange)) {
+ added = true;
+ break;
+ }
+ }
+ }
+ if (!added) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
+ }
+
+ log.info("Routing rule added successfully");
+
+ // Create temp core on source shard
+ Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
+
+ // create a temporary collection with just one node on the shard leader
+ String configName = zkStateReader.readConfigName(sourceCollection.getName());
+ Map<String, Object> props = makeMap(
+ Overseer.QUEUE_OPERATION, CREATE.toLower(),
+ NAME, tempSourceCollectionName,
+ REPLICATION_FACTOR, 1,
+ NUM_SLICES, 1,
+ COLL_CONF, configName,
+ CREATE_NODE_SET, sourceLeader.getNodeName());
+ if (asyncId != null) {
+ String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+ props.put(ASYNC, internalAsyncId);
+ }
+
+ log.info("Creating temporary collection: " + props);
+ ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results);
+ // refresh cluster state
+ clusterState = zkStateReader.getClusterState();
+ Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
+ Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
+
+ String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
+ String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+ sourceLeader.getNodeName(), tempCollectionReplica1);
+ // wait for the replicas to be seen as active on temp source leader
+ log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
+ CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(tempCollectionReplica1);
+ cmd.setNodeName(sourceLeader.getNodeName());
+ cmd.setCoreNodeName(coreNodeName);
+ cmd.setState(Replica.State.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ // we don't want this to happen asynchronously
+ ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
+ " or timed out waiting for it to come up", asyncId, requestMap);
+
+ log.info("Asking source leader to split index");
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+ params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
+ params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
+ params.set(CoreAdminParams.RANGES, splitRange.toString());
+ params.set("split.key", splitKey);
+
+ String tempNodeName = sourceLeader.getNodeName();
+
+ ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
+
+ log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
+ tempSourceCollectionName, targetLeader.getNodeName());
+ String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
+ props = new HashMap<>();
+ props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+ props.put(COLLECTION_PROP, tempSourceCollectionName);
+ props.put(SHARD_ID_PROP, tempSourceSlice.getName());
+ props.put("node", targetLeader.getNodeName());
+ props.put(CoreAdminParams.NAME, tempCollectionReplica2);
+ // copy over property params:
+ for (String key : message.keySet()) {
+ if (key.startsWith(COLL_PROP_PREFIX)) {
+ props.put(key, message.getStr(key));
+ }
+ }
+ // add async param
+ if (asyncId != null) {
+ props.put(ASYNC, asyncId);
+ }
+ ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
+ "temporary collection in target leader node.", asyncId, requestMap);
+
+ coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+ targetLeader.getNodeName(), tempCollectionReplica2);
+ // wait for the replicas to be seen as active on temp source leader
+ log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
+ cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(tempSourceLeader.getStr("core"));
+ cmd.setNodeName(targetLeader.getNodeName());
+ cmd.setCoreNodeName(coreNodeName);
+ cmd.setState(Replica.State.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ params = new ModifiableSolrParams(cmd.getParams());
+
+ ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
+ " replica or timed out waiting for them to come up", asyncId, requestMap);
+
+ log.info("Successfully created replica of temp source collection on target leader node");
+
+ log.info("Requesting merge of temp source collection replica to target leader");
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString());
+ params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
+ params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+ ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+ String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
+ + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
+ ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
+
+ log.info("Asking target leader to apply buffered updates");
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+ params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+ ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+ ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
+ asyncId, requestMap);
+
+ try {
+ log.info("Deleting temporary collection: " + tempSourceCollectionName);
+ props = makeMap(
+ Overseer.QUEUE_OPERATION, DELETE.toLower(),
+ NAME, tempSourceCollectionName);
+ ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+ } catch (Exception e) {
+ log.error("Unable to delete temporary collection: " + tempSourceCollectionName
+ + ". Please remove it manually", e);
+ }
+ }
+}