You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/23 10:30:49 UTC
[19/41] lucene-solr:jira/solr-11702: SOLR-11817: Move Collections API
classes to it's own package
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
new file mode 100644
index 0000000..c411fbc
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.lucene.util.Version;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public BackupCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String backupName = message.getStr(NAME);
+ String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+
+ Instant startTime = Instant.now();
+
+ CoreContainer cc = ocmh.overseer.getCoreContainer();
+ BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+ BackupManager backupMgr = new BackupManager(repository, ocmh.zkStateReader);
+
+ // Backup location
+ URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
+ URI backupPath = repository.resolve(location, backupName);
+
+ //Validating if the directory already exists.
+ if (repository.exists(backupPath)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
+ }
+
+ // Create a directory to store backup details.
+ repository.createDirectory(backupPath);
+
+ String strategy = message.getStr(CollectionAdminParams.INDEX_BACKUP_STRATEGY, CollectionAdminParams.COPY_FILES_STRATEGY);
+ switch (strategy) {
+ case CollectionAdminParams.COPY_FILES_STRATEGY: {
+ copyIndexFiles(backupPath, message, results);
+ break;
+ }
+ case CollectionAdminParams.NO_INDEX_BACKUP_STRATEGY: {
+ break;
+ }
+ }
+
+ log.info("Starting to backup ZK data for backupName={}", backupName);
+
+ //Download the configs
+ String configName = ocmh.zkStateReader.readConfigName(collectionName);
+ backupMgr.downloadConfigDir(location, backupName, configName);
+
+ //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
+ //Since we don't want to distinguish we extract the state and back it up as a separate json
+ DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+ backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
+
+ Properties properties = new Properties();
+
+ properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
+ properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
+ properties.put(OverseerCollectionMessageHandler.COLL_CONF, configName);
+ properties.put(BackupManager.START_TIME_PROP, startTime.toString());
+ properties.put(BackupManager.INDEX_VERSION_PROP, Version.LATEST.toString());
+ //TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
+ //if they are not the same then we can throw an error or have an 'overwriteConfig' flag
+ //TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
+
+ backupMgr.writeBackupProperties(location, backupName, properties);
+
+ log.info("Completed backing up ZK data for backupName={}", backupName);
+ }
+
+ private Replica selectReplicaWithSnapshot(CollectionSnapshotMetaData snapshotMeta, Slice slice) {
+ // The goal here is to choose the snapshot of the replica which was the leader at the time snapshot was created.
+ // If that is not possible, we choose any other replica for the given shard.
+ Collection<CoreSnapshotMetaData> snapshots = snapshotMeta.getReplicaSnapshotsForShard(slice.getName());
+
+ Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(x -> x.isLeader()).findFirst();
+ if (leaderCore.isPresent()) {
+ log.info("Replica {} was the leader when snapshot {} was created.", leaderCore.get().getCoreName(), snapshotMeta.getName());
+ Replica r = slice.getReplica(leaderCore.get().getCoreName());
+ if ((r != null) && !r.getState().equals(State.DOWN)) {
+ return r;
+ }
+ }
+
+ Optional<Replica> r = slice.getReplicas().stream()
+ .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
+ .findFirst();
+
+ if (!r.isPresent()) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Unable to find any live replica with a snapshot named " + snapshotMeta.getName() + " for shard " + slice.getName());
+ }
+
+ return r.get();
+ }
+
+ private void copyIndexFiles(URI backupPath, ZkNodeProps request, NamedList results) throws Exception {
+ String collectionName = request.getStr(COLLECTION_PROP);
+ String backupName = request.getStr(NAME);
+ String asyncId = request.getStr(ASYNC);
+ String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ Map<String, String> requestMap = new HashMap<>();
+
+ String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
+ Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
+ if (commitName != null) {
+ SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
+ snapshotMeta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
+ if (!snapshotMeta.isPresent()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
+ + " does not exist for collection " + collectionName);
+ }
+ if (snapshotMeta.get().getStatus() != SnapshotStatus.Successful) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName + " for collection " + collectionName
+ + " has not completed successfully. The status is " + snapshotMeta.get().getStatus());
+ }
+ }
+
+ log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
+ backupPath);
+
+ Collection<String> shardsToConsider = Collections.emptySet();
+ if (snapshotMeta.isPresent()) {
+ shardsToConsider = snapshotMeta.get().getShards();
+ }
+
+ for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
+ Replica replica = null;
+
+ if (snapshotMeta.isPresent()) {
+ if (!shardsToConsider.contains(slice.getName())) {
+ log.warn("Skipping the backup for shard {} since it wasn't part of the collection {} when snapshot {} was created.",
+ slice.getName(), collectionName, snapshotMeta.get().getName());
+ continue;
+ }
+ replica = selectReplicaWithSnapshot(snapshotMeta.get(), slice);
+ } else {
+ // Note - Actually this can return a null value when there is no leader for this shard.
+ replica = slice.getLeader();
+ if (replica == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No 'leader' replica available for shard " + slice.getName() + " of collection " + collectionName);
+ }
+ }
+
+ String coreName = replica.getStr(CORE_NAME_PROP);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString());
+ params.set(NAME, slice.getName());
+ params.set(CoreAdminParams.BACKUP_REPOSITORY, repoName);
+ params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); // note: index dir will be here then the "snapshot." + slice name
+ params.set(CORE_NAME_PROP, coreName);
+ if (snapshotMeta.isPresent()) {
+ params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName());
+ }
+
+ ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+ log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
+ }
+ log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
+
+ ocmh.processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
new file mode 100644
index 0000000..c54d792
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
@@ -0,0 +1,100 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+
+public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results)
+ throws Exception {
+ final String aliasName = message.getStr(NAME);
+ final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
+ final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
+
+ zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
+
+ // Sleep a bit to allow ZooKeeper state propagation.
+ //
+ // THIS IS A KLUDGE.
+ //
+ // Solr's view of the cluster is eventually consistent. *Eventually* all nodes and CloudSolrClients will be aware of
+ // alias changes, but not immediately. If a newly created alias is queried, things should work right away since Solr
+ // will attempt to see if it needs to get the latest aliases when it can't otherwise resolve the name. However
+ // modifications to an alias will take some time.
+ //
+ // We could levy this requirement on the client but they would probably always add an obligatory sleep, which is
+ // just kicking the can down the road. Perhaps ideally at this juncture here we could somehow wait until all
+ // Solr nodes in the cluster have the latest aliases?
+ Thread.sleep(100);
+ }
+
+ private void validateAllCollectionsExistAndNoDups(List<String> collectionList, ZkStateReader zkStateReader) {
+ final String collectionStr = StrUtils.join(collectionList, ',');
+
+ if (new HashSet<>(collectionList).size() != collectionList.size()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "Can't create collection alias for collections='%s', since it contains duplicates", collectionStr));
+ }
+ ClusterState clusterState = zkStateReader.getClusterState();
+ Set<String> aliasNames = zkStateReader.getAliases().getCollectionAliasListMap().keySet();
+ for (String collection : collectionList) {
+ if (clusterState.getCollectionOrNull(collection) == null && !aliasNames.contains(collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "Can't create collection alias for collections='%s', '%s' is not an existing collection or alias", collectionStr, collection));
+ }
+ }
+ }
+
+ /**
+ * The v2 API directs that the 'collections' parameter be provided as a JSON array (e.g. ["a", "b"]). We also
+ * maintain support for the legacy format, a comma-separated list (e.g. a,b).
+ */
+ @SuppressWarnings("unchecked")
+ private List<String> parseCollectionsParameter(Object colls) {
+ if (colls == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing collections param");
+ if (colls instanceof List) return (List<String>) colls;
+ return StrUtils.splitSmart(colls.toString(), ",", true).stream()
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
new file mode 100644
index 0000000..4d9c971
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.StrUtils.formatString;
+
+public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
+ private final DistribStateManager stateManager;
+
+ public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ this.stateManager = ocmh.cloudManager.getDistribStateManager();
+ this.timeSource = ocmh.cloudManager.getTimeSource();
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ final String collectionName = message.getStr(NAME);
+ final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+ log.info("Create collection {}", collectionName);
+ if (clusterState.hasCollection(collectionName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
+ }
+
+ String configName = getConfigName(collectionName, message);
+ if (configName == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
+ }
+
+ ocmh.validateConfigOrThrowSolrException(configName);
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+
+ try {
+
+ final String async = message.getStr(ASYNC);
+
+ List<String> nodeList = new ArrayList<>();
+ List<String> shardNames = new ArrayList<>();
+ List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
+ nodeList, shardNames, sessionWrapper);
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
+
+ ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
+
+ Map<String,String> collectionParams = new HashMap<>();
+ Map<String,Object> collectionProps = message.getProperties();
+ for (String propName : collectionProps.keySet()) {
+ if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
+ collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), (String) collectionProps.get(propName));
+ }
+ }
+
+ createCollectionZkNode(stateManager, collectionName, collectionParams);
+
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+ // wait for a while until we don't see the collection
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ boolean created = false;
+ while (! waitUntil.hasTimedOut()) {
+ waitUntil.sleep(100);
+ created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+ if(created) break;
+ }
+ if (!created)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+ if (nodeList.isEmpty()) {
+ log.debug("Finished create command for collection: {}", collectionName);
+ return;
+ }
+
+ // For tracking async calls.
+ Map<String, String> requestMap = new HashMap<>();
+
+
+ log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
+ collectionName, shardNames, message));
+ Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ String nodeName = replicaPosition.node;
+ String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
+ ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
+ replicaPosition.shard, replicaPosition.type, true);
+ log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
+ , coreName, replicaPosition.shard, collectionName, nodeName));
+
+
+ String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
+ //in the new mode, create the replica in clusterstate prior to creating the core.
+ // Otherwise the core creation fails
+ if (!isLegacyCloud) {
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+ ZkStateReader.COLLECTION_PROP, collectionName,
+ ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+ ZkStateReader.BASE_URL_PROP, baseUrl,
+ ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
+ CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ }
+
+ // Need to create new params for each request
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, coreName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collectionName);
+ params.set(CoreAdminParams.SHARD, replicaPosition.shard);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size());
+ params.set(CoreAdminParams.NEW_COLLECTION, "true");
+ params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
+
+ if (async != null) {
+ String coreAdminAsyncId = async + Math.abs(System.nanoTime());
+ params.add(ASYNC, coreAdminAsyncId);
+ requestMap.put(nodeName, coreAdminAsyncId);
+ }
+ ocmh.addPropertyParams(message, params);
+
+ ShardRequest sreq = new ShardRequest();
+ sreq.nodeName = nodeName;
+ params.set("qt", ocmh.adminPath);
+ sreq.purpose = 1;
+ sreq.shards = new String[]{baseUrl};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ if (isLegacyCloud) {
+ shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+ } else {
+ coresToCreate.put(coreName, sreq);
+ }
+ }
+
+ if(!isLegacyCloud) {
+ // wait for all replica entries to be created
+ Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+ for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
+ ShardRequest sreq = e.getValue();
+ sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+ shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+ }
+ }
+
+ ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
+ if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
+ // Let's cleanup as we hit an exception
+ // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
+ // element, which may be interpreted by the user as a positive ack
+ ocmh.cleanupCollection(collectionName, new NamedList());
+ log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
+ } else {
+ log.debug("Finished create command on all shards for collection: {}", collectionName);
+
+ // Emit a warning about production use of data driven functionality
+ boolean defaultConfigSetUsed = message.getStr(COLL_CONF) == null ||
+ message.getStr(COLL_CONF).equals(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME);
+ if (defaultConfigSetUsed) {
+ results.add("warning", "Using _default configset. Data driven schema functionality"
+ + " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:"
+ + " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
+ }
+ }
+ } catch (SolrException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
+ } finally {
+ if (sessionWrapper.get() != null) sessionWrapper.get().release();
+ }
+ }
+
+ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+ ZkNodeProps message,
+ List<String> nodeList, List<String> shardNames,
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+ final String collectionName = message.getStr(NAME);
+ // look at the replication factor and see if it matches reality
+ // if it does not, find best nodes to create more cores
+ int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
+ int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
+ int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+ String policy = message.getStr(Policy.POLICY);
+ boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
+
+ Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
+ String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+ if(ImplicitDocRouter.NAME.equals(router)){
+ ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+ numSlices = shardNames.size();
+ } else {
+ if (numSlices == null ) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
+ }
+ if (numSlices <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
+ }
+ ClusterStateMutator.getShardNames(numSlices, shardNames);
+ }
+
+ int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+ if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
+ }
+ if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
+ if (numNrtReplicas + numTlogReplicas <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+ }
+
+ // we need to look at every node and see how many cores it serves
+ // add our new cores to existing nodes serving the least number of cores
+ // but (for now) require that each core goes on a distinct node.
+
+ List<ReplicaPosition> replicaPositions;
+ nodeList.addAll(Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM));
+ if (nodeList.isEmpty()) {
+ log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
+
+ replicaPositions = new ArrayList<>();
+ } else {
+ int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
+ if (totalNumReplicas > nodeList.size()) {
+ log.warn("Specified number of replicas of "
+ + totalNumReplicas
+ + " on collection "
+ + collectionName
+ + " is higher than the number of Solr instances currently live or live and part of your " + OverseerCollectionMessageHandler.CREATE_NODE_SET + "("
+ + nodeList.size()
+ + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
+ }
+
+ int maxShardsAllowedToCreate = maxShardsPerNode == Integer.MAX_VALUE ?
+ Integer.MAX_VALUE :
+ maxShardsPerNode * nodeList.size();
+ int requestedShardsToCreate = numSlices * totalNumReplicas;
+ if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+ + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ + ", and the number of nodes currently live or live and part of your "+OverseerCollectionMessageHandler.CREATE_NODE_SET+" is " + nodeList.size()
+ + ". This allows a maximum of " + maxShardsAllowedToCreate
+ + " to be created. Value of " + OverseerCollectionMessageHandler.NUM_SLICES + " is " + numSlices
+ + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
+ + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
+ + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
+ + ". This requires " + requestedShardsToCreate
+ + " shards to be created (higher than the allowed number)");
+ }
+ replicaPositions = Assign.identifyNodes(cloudManager
+ , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+ }
+ return replicaPositions;
+ }
+
+ String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
+ String configName = message.getStr(COLL_CONF);
+
+ if (configName == null) {
+ // if there is only one conf, use that
+ List<String> configNames = null;
+ try {
+ configNames = ocmh.zkStateReader.getZkClient().getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
+ if (configNames.contains(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME)) {
+ if (!CollectionAdminParams.SYSTEM_COLL.equals(coll)) {
+ copyDefaultConfigSetTo(configNames, coll);
+ }
+ return coll;
+ } else if (configNames != null && configNames.size() == 1) {
+ configName = configNames.get(0);
+ // no config set named, but there is only 1 - use it
+ log.info("Only one config set found in zk - using it:" + configName);
+ }
+ } catch (KeeperException.NoNodeException e) {
+
+ }
+ }
+ return "".equals(configName)? null: configName;
+ }
+
+ /**
+ * Copies the _default configset to the specified configset name (overwrites if pre-existing)
+ */
+ private void copyDefaultConfigSetTo(List<String> configNames, String targetConfig) {
+ ZkConfigManager configManager = new ZkConfigManager(ocmh.zkStateReader.getZkClient());
+
+ // if a configset named coll exists, delete the configset so that _default can be copied over
+ if (configNames.contains(targetConfig)) {
+ log.info("There exists a configset by the same name as the collection we're trying to create: " + targetConfig +
+ ", deleting it so that we can copy the _default configs over and create the collection.");
+ try {
+ configManager.deleteConfigDir(targetConfig);
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.INVALID_STATE, "Error while deleting configset: " + targetConfig, e);
+ }
+ } else {
+ log.info("Only _default config set found, using it.");
+ }
+ // Copy _default into targetConfig
+ try {
+ configManager.copyConfigDir(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME, targetConfig, new HashSet<>());
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.INVALID_STATE, "Error while copying _default to " + targetConfig, e);
+ }
+ }
+
+ public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
+ log.debug("Check for collection zkNode:" + collection);
+ String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
+
+ try {
+ if (!stateManager.hasData(collectionPath)) {
+ log.debug("Creating collection in ZooKeeper:" + collection);
+
+ try {
+ Map<String,Object> collectionProps = new HashMap<>();
+
+ // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
+ String defaultConfigName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX + ZkController.CONFIGNAME_PROP, collection);
+
+ if (params.size() > 0) {
+ collectionProps.putAll(params);
+ // if the config name wasn't passed in, use the default
+ if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP)) {
+ // users can create the collection node and conf link ahead of time, or this may return another option
+ getConfName(stateManager, collection, collectionPath, collectionProps);
+ }
+
+ } else if (System.getProperty("bootstrap_confdir") != null) {
+ // if we are bootstrapping a collection, default the config for
+ // a new collection to the collection we are bootstrapping
+ log.info("Setting config for collection:" + collection + " to " + defaultConfigName);
+
+ Properties sysProps = System.getProperties();
+ for (String sprop : System.getProperties().stringPropertyNames()) {
+ if (sprop.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
+ collectionProps.put(sprop.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));
+ }
+ }
+
+ // if the config name wasn't passed in, use the default
+ if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP))
+ collectionProps.put(ZkController.CONFIGNAME_PROP, defaultConfigName);
+
+ } else if (Boolean.getBoolean("bootstrap_conf")) {
+ // the conf name should should be the collection name of this core
+ collectionProps.put(ZkController.CONFIGNAME_PROP, collection);
+ } else {
+ getConfName(stateManager, collection, collectionPath, collectionProps);
+ }
+
+ collectionProps.remove(ZkStateReader.NUM_SHARDS_PROP); // we don't put numShards in the collections properties
+
+ ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
+ stateManager.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, false);
+
+ } catch (KeeperException e) {
+ // it's okay if the node already exists
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ } catch (AlreadyExistsException e) {
+ // it's okay if the node already exists
+ }
+ } else {
+ log.debug("Collection zkNode exists");
+ }
+
+ } catch (KeeperException e) {
+ // it's okay if another beats us creating the node
+ if (e.code() == KeeperException.Code.NODEEXISTS) {
+ return;
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+ }
+
+ }
+
+ private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException,
+ KeeperException, InterruptedException {
+ // check for configName
+ log.debug("Looking for collection configName");
+ if (collectionProps.containsKey("configName")) {
+ log.info("configName was passed as a param {}", collectionProps.get("configName"));
+ return;
+ }
+
+ List<String> configNames = null;
+ int retry = 1;
+ int retryLimt = 6;
+ for (; retry < retryLimt; retry++) {
+ if (stateManager.hasData(collectionPath)) {
+ VersionedData data = stateManager.getData(collectionPath);
+ ZkNodeProps cProps = ZkNodeProps.load(data.getData());
+ if (cProps.containsKey(ZkController.CONFIGNAME_PROP)) {
+ break;
+ }
+ }
+
+ try {
+ configNames = stateManager.listData(ZkConfigManager.CONFIGS_ZKNODE);
+ } catch (NoSuchElementException | NoNodeException e) {
+ // just keep trying
+ }
+
+ // check if there's a config set with the same name as the collection
+ if (configNames != null && configNames.contains(collection)) {
+ log.info(
+ "Could not find explicit collection configName, but found config name matching collection name - using that set.");
+ collectionProps.put(ZkController.CONFIGNAME_PROP, collection);
+ break;
+ }
+ // if _default exists, use that
+ if (configNames != null && configNames.contains(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME)) {
+ log.info(
+ "Could not find explicit collection configName, but found _default config set - using that set.");
+ collectionProps.put(ZkController.CONFIGNAME_PROP, ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME);
+ break;
+ }
+ // if there is only one conf, use that
+ if (configNames != null && configNames.size() == 1) {
+ // no config set named, but there is only 1 - use it
+ log.info("Only one config set found in zk - using it:" + configNames.get(0));
+ collectionProps.put(ZkController.CONFIGNAME_PROP, configNames.get(0));
+ break;
+ }
+
+ log.info("Could not find collection configName - pausing for 3 seconds and trying again - try: " + retry);
+ Thread.sleep(3000);
+ }
+ if (retry == retryLimt) {
+ log.error("Could not find configName for collection " + collection);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Could not find configName for collection " + collection + " found:" + configNames);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
new file mode 100644
index 0000000..311d9ef
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.cloud.CloudUtil;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public CreateShardCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String sliceName = message.getStr(SHARD_ID_PROP);
+ boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+
+ log.info("Create shard invoked: {}", message);
+ if (collectionName == null || sliceName == null)
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
+
+ DocCollection collection = clusterState.getCollection(collectionName);
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+ SolrCloseableLatch countDownLatch;
+ try {
+ List<ReplicaPosition> positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+ // wait for a while until we see the shard
+ ocmh.waitForNewShard(collectionName, sliceName);
+
+ String async = message.getStr(ASYNC);
+ countDownLatch = new SolrCloseableLatch(positions.size(), ocmh);
+ for (ReplicaPosition position : positions) {
+ String nodeName = position.node;
+ String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type);
+ log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
+ + " on " + nodeName);
+
+ // Need to create new params for each request
+ ZkNodeProps addReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, collectionName,
+ SHARD_ID_PROP, sliceName,
+ ZkStateReader.REPLICA_TYPE, position.type.name(),
+ CoreAdminParams.NODE, nodeName,
+ CoreAdminParams.NAME, coreName,
+ CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
+ Map<String, Object> propertyParams = new HashMap<>();
+ ocmh.addPropertyParams(message, propertyParams);
+ addReplicasProps = addReplicasProps.plus(propertyParams);
+ if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
+ final NamedList addResult = new NamedList();
+ ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
+ countDownLatch.countDown();
+ Object addResultFailure = addResult.get("failure");
+ if (addResultFailure != null) {
+ SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
+ if (failure == null) {
+ failure = new SimpleOrderedMap();
+ results.add("failure", failure);
+ }
+ failure.addAll((NamedList) addResultFailure);
+ } else {
+ SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
+ if (success == null) {
+ success = new SimpleOrderedMap();
+ results.add("success", success);
+ }
+ success.addAll((NamedList) addResult.get("success"));
+ }
+ });
+ }
+ } finally {
+ if (sessionWrapper.get() != null) sessionWrapper.get().release();
+ }
+
+ log.debug("Waiting for create shard action to complete");
+ countDownLatch.await(5, TimeUnit.MINUTES);
+ log.debug("Finished waiting for create shard action to complete");
+
+ log.info("Finished create command on all shards for collection: " + collectionName);
+
+ }
+
+ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+ String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+ String sliceName = message.getStr(SHARD_ID_PROP);
+ DocCollection collection = clusterState.getCollection(collectionName);
+
+ int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
+ int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
+ int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
+ int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
+
+ if (numNrtReplicas + numTlogReplicas <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+ }
+
+ Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
+
+ boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+ List<ReplicaPosition> positions;
+ if (usePolicyFramework) {
+ if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
+ positions = Assign.identifyNodes(cloudManager,
+ clusterState,
+ Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM),
+ collection.getName(),
+ message,
+ Collections.singletonList(sliceName),
+ numNrtReplicas,
+ numTlogReplicas,
+ numPullReplicas);
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+ } else {
+ List<Assign.ReplicaCount> sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas,
+ createNodeSetStr, cloudManager);
+ int i = 0;
+ positions = new ArrayList<>();
+ for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+ Replica.Type.TLOG, numTlogReplicas,
+ Replica.Type.PULL, numPullReplicas
+ ).entrySet()) {
+ for (int j = 0; j < e.getValue(); j++) {
+ positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
+ i++;
+ }
+ }
+ }
+ return positions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
new file mode 100644
index 0000000..32715d6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the functionality of creating a collection level snapshot.
+ */
+public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public CreateSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String commitName = message.getStr(CoreAdminParams.COMMIT_NAME);
+ String asyncId = message.getStr(ASYNC);
+ SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
+ Date creationDate = new Date();
+
+ if(SolrSnapshotManager.snapshotExists(zkClient, collectionName, commitName)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
+ + " already exists for collection " + collectionName);
+ }
+
+ log.info("Creating a snapshot for collection={} with commitName={}", collectionName, commitName);
+
+ // Create a node in ZK to store the collection level snapshot meta-data.
+ SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName));
+ log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName);
+
+ Map<String, String> requestMap = new HashMap<>();
+ NamedList shardRequestResults = new NamedList();
+ Map<String, Slice> shardByCoreName = new HashMap<>();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+ for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ if (replica.getState() != State.ACTIVE) {
+ log.info("Replica {} is not active. Hence not sending the createsnapshot request", replica.getCoreName());
+ continue; // Since replica is not active - no point sending a request.
+ }
+
+ String coreName = replica.getStr(CORE_NAME_PROP);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATESNAPSHOT.toString());
+ params.set(NAME, slice.getName());
+ params.set(CORE_NAME_PROP, coreName);
+ params.set(CoreAdminParams.COMMIT_NAME, commitName);
+
+ ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+ log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName);
+
+ shardByCoreName.put(coreName, slice);
+ }
+ }
+
+ // At this point we want to make sure that at-least one replica for every shard
+ // is able to create the snapshot. If that is not the case, then we fail the request.
+ // This is to take care of the situation where e.g. entire shard is unavailable.
+ Set<String> failedShards = new HashSet<>();
+
+ ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+ NamedList success = (NamedList) shardRequestResults.get("success");
+ List<CoreSnapshotMetaData> replicas = new ArrayList<>();
+ if (success != null) {
+ for ( int i = 0 ; i < success.size() ; i++) {
+ NamedList resp = (NamedList)success.getVal(i);
+
+ // Check if this core is the leader for the shard. The idea here is that during the backup
+ // operation we preferably use the snapshot of the "leader" replica since it is most likely
+ // to have latest state.
+ String coreName = (String)resp.get(CoreAdminParams.CORE);
+ Slice slice = shardByCoreName.remove(coreName);
+ boolean leader = (slice.getLeader() != null && slice.getLeader().getCoreName().equals(coreName));
+ resp.add(SolrSnapshotManager.SHARD_ID, slice.getName());
+ resp.add(SolrSnapshotManager.LEADER, leader);
+
+ CoreSnapshotMetaData c = new CoreSnapshotMetaData(resp);
+ replicas.add(c);
+ log.info("Snapshot with commitName {} is created successfully for core {}", commitName, c.getCoreName());
+ }
+ }
+
+ if (!shardByCoreName.isEmpty()) { // One or more failures.
+ log.warn("Unable to create a snapshot with name {} for following cores {}", commitName, shardByCoreName.keySet());
+
+ // Count number of failures per shard.
+ Map<String, Integer> failuresByShardId = new HashMap<>();
+ for (Map.Entry<String,Slice> entry : shardByCoreName.entrySet()) {
+ int f = 0;
+ if (failuresByShardId.get(entry.getValue().getName()) != null) {
+ f = failuresByShardId.get(entry.getValue().getName());
+ }
+ failuresByShardId.put(entry.getValue().getName(), f + 1);
+ }
+
+ // Now that we know number of failures per shard, we can figure out
+ // if at-least one replica per shard was able to create a snapshot or not.
+ DocCollection collectionStatus = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+ for (Map.Entry<String,Integer> entry : failuresByShardId.entrySet()) {
+ int replicaCount = collectionStatus.getSlice(entry.getKey()).getReplicas().size();
+ if (replicaCount <= entry.getValue()) {
+ failedShards.add(entry.getKey());
+ }
+ }
+ }
+
+ if (failedShards.isEmpty()) { // No failures.
+ CollectionSnapshotMetaData meta = new CollectionSnapshotMetaData(commitName, SnapshotStatus.Successful, creationDate, replicas);
+ SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, meta);
+ log.info("Saved following snapshot information for collection={} with commitName={} in Zookeeper : {}", collectionName,
+ commitName, meta.toNamedList());
+ } else {
+ log.warn("Failed to create a snapshot for collection {} with commitName = {}. Snapshot could not be captured for following shards {}",
+ collectionName, commitName, failedShards);
+ // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
+ // which cores have the named snapshot.
+ CollectionSnapshotMetaData meta = new CollectionSnapshotMetaData(commitName, SnapshotStatus.Failed, creationDate, replicas);
+ SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, meta);
+ log.info("Saved following snapshot information for collection={} with commitName={} in Zookeeper : {}", collectionName,
+ commitName, meta.toNamedList());
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to create snapshot on shards " + failedShards);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java
new file mode 100644
index 0000000..e199d7d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java
@@ -0,0 +1,43 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteAliasCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ String aliasName = message.getStr(NAME);
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ zkStateReader.aliasesHolder.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(aliasName, null));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
new file mode 100644
index 0000000..bdae8b9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -0,0 +1,142 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.NonExistentCoreException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OverseerCollectionMessageHandler ocmh;
+ private final TimeSource timeSource;
+
+ public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ this.timeSource = ocmh.cloudManager.getTimeSource();
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ Aliases aliases = zkStateReader.getAliases();
+ final String collection = message.getStr(NAME);
+ for (Map.Entry<String, List<String>> ent : aliases.getCollectionAliasListMap().entrySet()) {
+ if (ent.getValue().contains(collection)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Collection : " + collection + " is part of alias " + ent.getKey() + " remove or modify the alias before removing this collection.");
+ }
+ }
+
+ try {
+ // Remove the snapshots meta-data for this collection in ZK. Deleting actual index files
+ // should be taken care of as part of collection delete operation.
+ SolrZkClient zkClient = zkStateReader.getZkClient();
+ SolrSnapshotManager.cleanupCollectionLevelSnapshots(zkClient, collection);
+
+ if (zkStateReader.getClusterState().getCollectionOrNull(collection) == null) {
+ if (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+ // if the collection is not in the clusterstate, but is listed in zk, do nothing, it will just
+ // be removed in the finally - we cannot continue, because the below code will error if the collection
+ // is not in the clusterstate
+ return;
+ }
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+ params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+
+ String asyncId = message.getStr(ASYNC);
+ Map<String, String> requestMap = null;
+ if (asyncId != null) {
+ requestMap = new HashMap<>();
+ }
+
+ Set<String> okayExceptions = new HashSet<>(1);
+ okayExceptions.add(NonExistentCoreException.class.getName());
+
+ ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+ // wait for a while until we don't see the collection
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ boolean removed = false;
+ while (! timeout.hasTimedOut()) {
+ timeout.sleep(100);
+ removed = !zkStateReader.getClusterState().hasCollection(collection);
+ if (removed) {
+ timeout.sleep(500); // just a bit of time so it's more likely other
+ // readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + collection);
+ }
+
+ } finally {
+
+ try {
+ if (zkStateReader.getZkClient().exists(
+ ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+ zkStateReader.getZkClient().clean(
+ ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+ }
+ } catch (InterruptedException e) {
+ SolrException.log(log, "Cleaning up collection in zk was interrupted:"
+ + collection, e);
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ SolrException.log(log, "Problem cleaning up collection in zk:"
+ + collection, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
new file mode 100644
index 0000000..ab4dc0c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public DeleteNodeCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ ocmh.checkRequired(message, "node");
+ String node = message.getStr("node");
+ if (!state.liveNodesContain(node)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + node + " is not live");
+ }
+ List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state);
+ List<String> singleReplicas = verifyReplicaAvailability(sourceReplicas, state);
+ if (!singleReplicas.isEmpty()) {
+ results.add("failure", "Can't delete the only existing non-PULL replica(s) on node " + node + ": " + singleReplicas.toString());
+ } else {
+ cleanupReplicas(results, state, sourceReplicas, ocmh, node, message.getStr(ASYNC));
+ }
+ }
+
+ // collect names of replicas that cannot be deleted
+ static List<String> verifyReplicaAvailability(List<ZkNodeProps> sourceReplicas, ClusterState state) {
+ List<String> res = new ArrayList<>();
+ for (ZkNodeProps sourceReplica : sourceReplicas) {
+ String coll = sourceReplica.getStr(COLLECTION_PROP);
+ String shard = sourceReplica.getStr(SHARD_ID_PROP);
+ String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ DocCollection collection = state.getCollection(coll);
+ Slice slice = collection.getSlice(shard);
+ if (slice.getReplicas().size() < 2) {
+ // can't delete the only replica in existence
+ res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+ } else { // check replica types
+ int otherNonPullReplicas = 0;
+ for (Replica r : slice.getReplicas()) {
+ if (!r.getName().equals(replicaName) && !r.getType().equals(Replica.Type.PULL)) {
+ otherNonPullReplicas++;
+ }
+ }
+ // can't delete - there are no other non-pull replicas
+ if (otherNonPullReplicas == 0) {
+ res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+ }
+ }
+ }
+ return res;
+ }
+
+ static void cleanupReplicas(NamedList results,
+ ClusterState clusterState,
+ List<ZkNodeProps> sourceReplicas,
+ OverseerCollectionMessageHandler ocmh,
+ String node,
+ String async) throws InterruptedException {
+ CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
+ for (ZkNodeProps sourceReplica : sourceReplicas) {
+ String coll = sourceReplica.getStr(COLLECTION_PROP);
+ String shard = sourceReplica.getStr(SHARD_ID_PROP);
+ String type = sourceReplica.getStr(ZkStateReader.REPLICA_TYPE);
+ log.info("Deleting replica type={} for collection={} shard={} on node={}", type, coll, shard, node);
+ NamedList deleteResult = new NamedList();
+ try {
+ if (async != null) sourceReplica = sourceReplica.plus(ASYNC, async);
+ ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
+ cleanupLatch.countDown();
+ if (deleteResult.get("failure") != null) {
+ synchronized (results) {
+
+ results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
+ " on node=%s", coll, shard, node));
+ }
+ }
+ });
+ } catch (KeeperException e) {
+ log.warn("Error deleting ", e);
+ cleanupLatch.countDown();
+ } catch (Exception e) {
+ log.warn("Error deleting ", e);
+ cleanupLatch.countDown();
+ throw e;
+ }
+ }
+ log.debug("Waiting for delete node action to complete");
+ cleanupLatch.await(5, TimeUnit.MINUTES);
+ }
+
+
+}