You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by va...@apache.org on 2018/01/16 20:39:51 UTC

[10/15] lucene-solr:branch_7x: SOLR-11817: Move Collections API classes to it's own package

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
new file mode 100644
index 0000000..c411fbc
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.lucene.util.Version;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public BackupCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String backupName = message.getStr(NAME);
+    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+
+    Instant startTime = Instant.now();
+
+    CoreContainer cc = ocmh.overseer.getCoreContainer();
+    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+    BackupManager backupMgr = new BackupManager(repository, ocmh.zkStateReader);
+
+    // Backup location
+    URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
+    URI backupPath = repository.resolve(location, backupName);
+
+    //Validating if the directory already exists.
+    if (repository.exists(backupPath)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
+    }
+
+    // Create a directory to store backup details.
+    repository.createDirectory(backupPath);
+
+    String strategy = message.getStr(CollectionAdminParams.INDEX_BACKUP_STRATEGY, CollectionAdminParams.COPY_FILES_STRATEGY);
+    switch (strategy) {
+      case CollectionAdminParams.COPY_FILES_STRATEGY: {
+        copyIndexFiles(backupPath, message, results);
+        break;
+      }
+      case CollectionAdminParams.NO_INDEX_BACKUP_STRATEGY: {
+        break;
+      }
+    }
+
+    log.info("Starting to backup ZK data for backupName={}", backupName);
+
+    //Download the configs
+    String configName = ocmh.zkStateReader.readConfigName(collectionName);
+    backupMgr.downloadConfigDir(location, backupName, configName);
+
+    //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
+    //Since we don't want to distinguish we extract the state and back it up as a separate json
+    DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+    backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
+
+    Properties properties = new Properties();
+
+    properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
+    properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
+    properties.put(OverseerCollectionMessageHandler.COLL_CONF, configName);
+    properties.put(BackupManager.START_TIME_PROP, startTime.toString());
+    properties.put(BackupManager.INDEX_VERSION_PROP, Version.LATEST.toString());
+    //TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
+    //if they are not the same then we can throw an error or have an 'overwriteConfig' flag
+    //TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
+
+    backupMgr.writeBackupProperties(location, backupName, properties);
+
+    log.info("Completed backing up ZK data for backupName={}", backupName);
+  }
+
+  private Replica selectReplicaWithSnapshot(CollectionSnapshotMetaData snapshotMeta, Slice slice) {
+    // The goal here is to choose the snapshot of the replica which was the leader at the time snapshot was created.
+    // If that is not possible, we choose any other replica for the given shard.
+    Collection<CoreSnapshotMetaData> snapshots = snapshotMeta.getReplicaSnapshotsForShard(slice.getName());
+
+    Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(x -> x.isLeader()).findFirst();
+    if (leaderCore.isPresent()) {
+      log.info("Replica {} was the leader when snapshot {} was created.", leaderCore.get().getCoreName(), snapshotMeta.getName());
+      Replica r = slice.getReplica(leaderCore.get().getCoreName());
+      if ((r != null) && !r.getState().equals(State.DOWN)) {
+        return r;
+      }
+    }
+
+    Optional<Replica> r = slice.getReplicas().stream()
+                               .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
+                               .findFirst();
+
+    if (!r.isPresent()) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Unable to find any live replica with a snapshot named " + snapshotMeta.getName() + " for shard " + slice.getName());
+    }
+
+    return r.get();
+  }
+
+  private void copyIndexFiles(URI backupPath, ZkNodeProps request, NamedList results) throws Exception {
+    String collectionName = request.getStr(COLLECTION_PROP);
+    String backupName = request.getStr(NAME);
+    String asyncId = request.getStr(ASYNC);
+    String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    Map<String, String> requestMap = new HashMap<>();
+
+    String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
+    Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
+    if (commitName != null) {
+      SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
+      snapshotMeta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
+      if (!snapshotMeta.isPresent()) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
+            + " does not exist for collection " + collectionName);
+      }
+      if (snapshotMeta.get().getStatus() != SnapshotStatus.Successful) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName + " for collection " + collectionName
+            + " has not completed successfully. The status is " + snapshotMeta.get().getStatus());
+      }
+    }
+
+    log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
+        backupPath);
+
+    Collection<String> shardsToConsider = Collections.emptySet();
+    if (snapshotMeta.isPresent()) {
+      shardsToConsider = snapshotMeta.get().getShards();
+    }
+
+    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
+      Replica replica = null;
+
+      if (snapshotMeta.isPresent()) {
+        if (!shardsToConsider.contains(slice.getName())) {
+          log.warn("Skipping the backup for shard {} since it wasn't part of the collection {} when snapshot {} was created.",
+              slice.getName(), collectionName, snapshotMeta.get().getName());
+          continue;
+        }
+        replica = selectReplicaWithSnapshot(snapshotMeta.get(), slice);
+      } else {
+        // Note - Actually this can return a null value when there is no leader for this shard.
+        replica = slice.getLeader();
+        if (replica == null) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "No 'leader' replica available for shard " + slice.getName() + " of collection " + collectionName);
+        }
+      }
+
+      String coreName = replica.getStr(CORE_NAME_PROP);
+
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString());
+      params.set(NAME, slice.getName());
+      params.set(CoreAdminParams.BACKUP_REPOSITORY, repoName);
+      params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); // note: index dir will be here then the "snapshot." + slice name
+      params.set(CORE_NAME_PROP, coreName);
+      if (snapshotMeta.isPresent()) {
+        params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName());
+      }
+
+      ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+      log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
+    }
+    log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
+
+    ocmh.processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
new file mode 100644
index 0000000..c54d792
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
@@ -0,0 +1,100 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+
+public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results)
+      throws Exception {
+    final String aliasName = message.getStr(NAME);
+    final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
+    final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
+
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
+
+    zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
+
+    // Sleep a bit to allow ZooKeeper state propagation.
+    //
+    // THIS IS A KLUDGE.
+    //
+    // Solr's view of the cluster is eventually consistent. *Eventually* all nodes and CloudSolrClients will be aware of
+    // alias changes, but not immediately. If a newly created alias is queried, things should work right away since Solr
+    // will attempt to see if it needs to get the latest aliases when it can't otherwise resolve the name.  However
+    // modifications to an alias will take some time.
+    //
+    // We could levy this requirement on the client but they would probably always add an obligatory sleep, which is
+    // just kicking the can down the road.  Perhaps ideally at this juncture here we could somehow wait until all
+    // Solr nodes in the cluster have the latest aliases?
+    Thread.sleep(100);
+  }
+
+  private void validateAllCollectionsExistAndNoDups(List<String> collectionList, ZkStateReader zkStateReader) {
+    final String collectionStr = StrUtils.join(collectionList, ',');
+
+    if (new HashSet<>(collectionList).size() != collectionList.size()) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          String.format(Locale.ROOT,  "Can't create collection alias for collections='%s', since it contains duplicates", collectionStr));
+    }
+    ClusterState clusterState = zkStateReader.getClusterState();
+    Set<String> aliasNames = zkStateReader.getAliases().getCollectionAliasListMap().keySet();
+    for (String collection : collectionList) {
+      if (clusterState.getCollectionOrNull(collection) == null && !aliasNames.contains(collection)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            String.format(Locale.ROOT,  "Can't create collection alias for collections='%s', '%s' is not an existing collection or alias", collectionStr, collection));
+      }
+    }
+  }
+  
+  /**
+   * The v2 API directs that the 'collections' parameter be provided as a JSON array (e.g. ["a", "b"]).  We also
+   * maintain support for the legacy format, a comma-separated list (e.g. a,b).
+   */
+  @SuppressWarnings("unchecked")
+  private List<String> parseCollectionsParameter(Object colls) {
+    if (colls == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing collections param");
+    if (colls instanceof List) return (List<String>) colls;
+    return StrUtils.splitSmart(colls.toString(), ",", true).stream()
+        .map(String::trim)
+        .collect(Collectors.toList());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
new file mode 100644
index 0000000..4d9c971
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.StrUtils.formatString;
+
+public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
+  private final DistribStateManager stateManager;
+
+  public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+    this.stateManager = ocmh.cloudManager.getDistribStateManager();
+    this.timeSource = ocmh.cloudManager.getTimeSource();
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    final String collectionName = message.getStr(NAME);
+    final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+    log.info("Create collection {}", collectionName);
+    if (clusterState.hasCollection(collectionName)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
+    }
+
+    String configName = getConfigName(collectionName, message);
+    if (configName == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
+    }
+
+    ocmh.validateConfigOrThrowSolrException(configName);
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+
+    try {
+
+      final String async = message.getStr(ASYNC);
+
+      List<String> nodeList = new ArrayList<>();
+      List<String> shardNames = new ArrayList<>();
+      List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
+          nodeList, shardNames, sessionWrapper);
+      ZkStateReader zkStateReader = ocmh.zkStateReader;
+      boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
+
+      ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
+
+      Map<String,String> collectionParams = new HashMap<>();
+      Map<String,Object> collectionProps = message.getProperties();
+      for (String propName : collectionProps.keySet()) {
+        if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
+          collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), (String) collectionProps.get(propName));
+        }
+      }
+      
+      createCollectionZkNode(stateManager, collectionName, collectionParams);
+      
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+      // wait for a while until we don't see the collection
+      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+      boolean created = false;
+      while (! waitUntil.hasTimedOut()) {
+        waitUntil.sleep(100);
+        created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+        if(created) break;
+      }
+      if (!created)
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+      if (nodeList.isEmpty()) {
+        log.debug("Finished create command for collection: {}", collectionName);
+        return;
+      }
+
+      // For tracking async calls.
+      Map<String, String> requestMap = new HashMap<>();
+
+
+      log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
+          collectionName, shardNames, message));
+      Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+      for (ReplicaPosition replicaPosition : replicaPositions) {
+        String nodeName = replicaPosition.node;
+        String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
+            ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
+            replicaPosition.shard, replicaPosition.type, true);
+        log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
+            , coreName, replicaPosition.shard, collectionName, nodeName));
+
+
+        String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
+        //in the new mode, create the replica in clusterstate prior to creating the core.
+        // Otherwise the core creation fails
+        if (!isLegacyCloud) {
+          ZkNodeProps props = new ZkNodeProps(
+              Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+              ZkStateReader.COLLECTION_PROP, collectionName,
+              ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
+              ZkStateReader.CORE_NAME_PROP, coreName,
+              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+              ZkStateReader.BASE_URL_PROP, baseUrl,
+              ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
+              CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
+          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+        }
+
+        // Need to create new params for each request
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+
+        params.set(CoreAdminParams.NAME, coreName);
+        params.set(COLL_CONF, configName);
+        params.set(CoreAdminParams.COLLECTION, collectionName);
+        params.set(CoreAdminParams.SHARD, replicaPosition.shard);
+        params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size());
+        params.set(CoreAdminParams.NEW_COLLECTION, "true");
+        params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
+
+        if (async != null) {
+          String coreAdminAsyncId = async + Math.abs(System.nanoTime());
+          params.add(ASYNC, coreAdminAsyncId);
+          requestMap.put(nodeName, coreAdminAsyncId);
+        }
+        ocmh.addPropertyParams(message, params);
+
+        ShardRequest sreq = new ShardRequest();
+        sreq.nodeName = nodeName;
+        params.set("qt", ocmh.adminPath);
+        sreq.purpose = 1;
+        sreq.shards = new String[]{baseUrl};
+        sreq.actualShards = sreq.shards;
+        sreq.params = params;
+
+        if (isLegacyCloud) {
+          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+        } else {
+          coresToCreate.put(coreName, sreq);
+        }
+      }
+
+      if(!isLegacyCloud) {
+        // wait for all replica entries to be created
+        Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+        for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
+          ShardRequest sreq = e.getValue();
+          sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+        }
+      }
+
+      ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
+      if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
+        // Let's cleanup as we hit an exception
+        // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
+        // element, which may be interpreted by the user as a positive ack
+        ocmh.cleanupCollection(collectionName, new NamedList());
+        log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
+      } else {
+        log.debug("Finished create command on all shards for collection: {}", collectionName);
+
+        // Emit a warning about production use of data driven functionality
+        boolean defaultConfigSetUsed = message.getStr(COLL_CONF) == null ||
+            message.getStr(COLL_CONF).equals(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME);
+        if (defaultConfigSetUsed) {
+          results.add("warning", "Using _default configset. Data driven schema functionality"
+              + " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:"
+              + " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
+        }
+      }
+    } catch (SolrException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
+    } finally {
+      if (sessionWrapper.get() != null) sessionWrapper.get().release();
+    }
+  }
+
+  public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+                                                            ZkNodeProps message,
+                                                            List<String> nodeList, List<String> shardNames,
+                                                            AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+    final String collectionName = message.getStr(NAME);
+    // look at the replication factor and see if it matches reality
+    // if it does not, find best nodes to create more cores
+    int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
+    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
+    int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+    String policy = message.getStr(Policy.POLICY);
+    boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
+
+    Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
+    String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+    if(ImplicitDocRouter.NAME.equals(router)){
+      ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+      numSlices = shardNames.size();
+    } else {
+      if (numSlices == null ) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
+      }
+      if (numSlices <= 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
+      }
+      ClusterStateMutator.getShardNames(numSlices, shardNames);
+    }
+
+    int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+    if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
+    }
+    if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
+    if (numNrtReplicas + numTlogReplicas <= 0) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+    }
+
+    // we need to look at every node and see how many cores it serves
+    // add our new cores to existing nodes serving the least number of cores
+    // but (for now) require that each core goes on a distinct node.
+
+    List<ReplicaPosition> replicaPositions;
+    nodeList.addAll(Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM));
+    if (nodeList.isEmpty()) {
+      log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
+
+      replicaPositions = new ArrayList<>();
+    } else {
+      int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
+      if (totalNumReplicas > nodeList.size()) {
+        log.warn("Specified number of replicas of "
+            + totalNumReplicas
+            + " on collection "
+            + collectionName
+            + " is higher than the number of Solr instances currently live or live and part of your " + OverseerCollectionMessageHandler.CREATE_NODE_SET + "("
+            + nodeList.size()
+            + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
+      }
+
+      int maxShardsAllowedToCreate = maxShardsPerNode == Integer.MAX_VALUE ?
+          Integer.MAX_VALUE :
+          maxShardsPerNode * nodeList.size();
+      int requestedShardsToCreate = numSlices * totalNumReplicas;
+      if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+            + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+            + ", and the number of nodes currently live or live and part of your "+OverseerCollectionMessageHandler.CREATE_NODE_SET+" is " + nodeList.size()
+            + ". This allows a maximum of " + maxShardsAllowedToCreate
+            + " to be created. Value of " + OverseerCollectionMessageHandler.NUM_SLICES + " is " + numSlices
+            + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
+            + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
+            + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
+            + ". This requires " + requestedShardsToCreate
+            + " shards to be created (higher than the allowed number)");
+      }
+      replicaPositions = Assign.identifyNodes(cloudManager
+          , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+      sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+    }
+    return replicaPositions;
+  }
+
+  String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
+    String configName = message.getStr(COLL_CONF);
+
+    if (configName == null) {
+      // if there is only one conf, use that
+      List<String> configNames = null;
+      try {
+        configNames = ocmh.zkStateReader.getZkClient().getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
+        if (configNames.contains(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME)) {
+          if (!CollectionAdminParams.SYSTEM_COLL.equals(coll)) {
+            copyDefaultConfigSetTo(configNames, coll);
+          }
+          return coll;
+        } else if (configNames != null && configNames.size() == 1) {
+          configName = configNames.get(0);
+          // no config set named, but there is only 1 - use it
+          log.info("Only one config set found in zk - using it:" + configName);
+        }
+      } catch (KeeperException.NoNodeException e) {
+
+      }
+    }
+    return "".equals(configName)? null: configName;
+  }
+  
+  /**
+   * Copies the _default configset to the specified configset name (overwrites if pre-existing)
+   */
+  private void copyDefaultConfigSetTo(List<String> configNames, String targetConfig) {
+    ZkConfigManager configManager = new ZkConfigManager(ocmh.zkStateReader.getZkClient());
+
+    // if a configset named coll exists, delete the configset so that _default can be copied over
+    if (configNames.contains(targetConfig)) {
+      log.info("There exists a configset by the same name as the collection we're trying to create: " + targetConfig +
+          ", deleting it so that we can copy the _default configs over and create the collection.");
+      try {
+        configManager.deleteConfigDir(targetConfig);
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.INVALID_STATE, "Error while deleting configset: " + targetConfig, e);
+      }
+    } else {
+      log.info("Only _default config set found, using it.");
+    }
+    // Copy _default into targetConfig
+    try {
+      configManager.copyConfigDir(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME, targetConfig, new HashSet<>());
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.INVALID_STATE, "Error while copying _default to " + targetConfig, e);
+    }
+  }
+
+  public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
+    log.debug("Check for collection zkNode:" + collection);
+    String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
+
+    try {
+      if (!stateManager.hasData(collectionPath)) {
+        log.debug("Creating collection in ZooKeeper:" + collection);
+
+        try {
+          Map<String,Object> collectionProps = new HashMap<>();
+
+          // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
+          String defaultConfigName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX + ZkController.CONFIGNAME_PROP, collection);
+
+          if (params.size() > 0) {
+            collectionProps.putAll(params);
+            // if the config name wasn't passed in, use the default
+            if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP)) {
+              // users can create the collection node and conf link ahead of time, or this may return another option
+              getConfName(stateManager, collection, collectionPath, collectionProps);
+            }
+
+          } else if (System.getProperty("bootstrap_confdir") != null) {
+            // if we are bootstrapping a collection, default the config for
+            // a new collection to the collection we are bootstrapping
+            log.info("Setting config for collection:" + collection + " to " + defaultConfigName);
+
+            Properties sysProps = System.getProperties();
+            for (String sprop : System.getProperties().stringPropertyNames()) {
+              if (sprop.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
+                collectionProps.put(sprop.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));
+              }
+            }
+
+            // if the config name wasn't passed in, use the default
+            if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP))
+              collectionProps.put(ZkController.CONFIGNAME_PROP, defaultConfigName);
+
+          } else if (Boolean.getBoolean("bootstrap_conf")) {
+            // the conf name should should be the collection name of this core
+            collectionProps.put(ZkController.CONFIGNAME_PROP, collection);
+          } else {
+            getConfName(stateManager, collection, collectionPath, collectionProps);
+          }
+
+          collectionProps.remove(ZkStateReader.NUM_SHARDS_PROP);  // we don't put numShards in the collections properties
+
+          ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
+          stateManager.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, false);
+
+        } catch (KeeperException e) {
+          // it's okay if the node already exists
+          if (e.code() != KeeperException.Code.NODEEXISTS) {
+            throw e;
+          }
+        } catch (AlreadyExistsException e) {
+          // it's okay if the node already exists
+        }
+      } else {
+        log.debug("Collection zkNode exists");
+      }
+
+    } catch (KeeperException e) {
+      // it's okay if another beats us creating the node
+      if (e.code() == KeeperException.Code.NODEEXISTS) {
+        return;
+      }
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+    } catch (IOException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+    }
+
+  }
+  
+  private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException,
+      KeeperException, InterruptedException {
+    // check for configName
+    log.debug("Looking for collection configName");
+    if (collectionProps.containsKey("configName")) {
+      log.info("configName was passed as a param {}", collectionProps.get("configName"));
+      return;
+    }
+
+    List<String> configNames = null;
+    int retry = 1;
+    int retryLimt = 6;
+    for (; retry < retryLimt; retry++) {
+      if (stateManager.hasData(collectionPath)) {
+        VersionedData data = stateManager.getData(collectionPath);
+        ZkNodeProps cProps = ZkNodeProps.load(data.getData());
+        if (cProps.containsKey(ZkController.CONFIGNAME_PROP)) {
+          break;
+        }
+      }
+
+      try {
+        configNames = stateManager.listData(ZkConfigManager.CONFIGS_ZKNODE);
+      } catch (NoSuchElementException | NoNodeException e) {
+        // just keep trying
+      }
+
+      // check if there's a config set with the same name as the collection
+      if (configNames != null && configNames.contains(collection)) {
+        log.info(
+            "Could not find explicit collection configName, but found config name matching collection name - using that set.");
+        collectionProps.put(ZkController.CONFIGNAME_PROP, collection);
+        break;
+      }
+      // if _default exists, use that
+      if (configNames != null && configNames.contains(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME)) {
+        log.info(
+            "Could not find explicit collection configName, but found _default config set - using that set.");
+        collectionProps.put(ZkController.CONFIGNAME_PROP, ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME);
+        break;
+      }
+      // if there is only one conf, use that
+      if (configNames != null && configNames.size() == 1) {
+        // no config set named, but there is only 1 - use it
+        log.info("Only one config set found in zk - using it:" + configNames.get(0));
+        collectionProps.put(ZkController.CONFIGNAME_PROP, configNames.get(0));
+        break;
+      }
+
+      log.info("Could not find collection configName - pausing for 3 seconds and trying again - try: " + retry);
+      Thread.sleep(3000);
+    }
+    if (retry == retryLimt) {
+      log.error("Could not find configName for collection " + collection);
+      throw new ZooKeeperException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Could not find configName for collection " + collection + " found:" + configNames);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
new file mode 100644
index 0000000..311d9ef
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.cloud.CloudUtil;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateShardCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String sliceName = message.getStr(SHARD_ID_PROP);
+    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+
+    log.info("Create shard invoked: {}", message);
+    if (collectionName == null || sliceName == null)
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
+
+    DocCollection collection = clusterState.getCollection(collectionName);
+
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+    SolrCloseableLatch countDownLatch;
+    try {
+      List<ReplicaPosition> positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper);
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+      // wait for a while until we see the shard
+      ocmh.waitForNewShard(collectionName, sliceName);
+
+      String async = message.getStr(ASYNC);
+      countDownLatch = new SolrCloseableLatch(positions.size(), ocmh);
+      for (ReplicaPosition position : positions) {
+        String nodeName = position.node;
+        String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type);
+        log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
+            + " on " + nodeName);
+
+        // Need to create new params for each request
+        ZkNodeProps addReplicasProps = new ZkNodeProps(
+            COLLECTION_PROP, collectionName,
+            SHARD_ID_PROP, sliceName,
+            ZkStateReader.REPLICA_TYPE, position.type.name(),
+            CoreAdminParams.NODE, nodeName,
+            CoreAdminParams.NAME, coreName,
+            CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
+        Map<String, Object> propertyParams = new HashMap<>();
+        ocmh.addPropertyParams(message, propertyParams);
+        addReplicasProps = addReplicasProps.plus(propertyParams);
+        if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
+        final NamedList addResult = new NamedList();
+        ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
+          countDownLatch.countDown();
+          Object addResultFailure = addResult.get("failure");
+          if (addResultFailure != null) {
+            SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
+            if (failure == null) {
+              failure = new SimpleOrderedMap();
+              results.add("failure", failure);
+            }
+            failure.addAll((NamedList) addResultFailure);
+          } else {
+            SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
+            if (success == null) {
+              success = new SimpleOrderedMap();
+              results.add("success", success);
+            }
+            success.addAll((NamedList) addResult.get("success"));
+          }
+        });
+      }
+    } finally {
+      if (sessionWrapper.get() != null) sessionWrapper.get().release();
+    }
+
+    log.debug("Waiting for create shard action to complete");
+    countDownLatch.await(5, TimeUnit.MINUTES);
+    log.debug("Finished waiting for create shard action to complete");
+
+    log.info("Finished create command on all shards for collection: " + collectionName);
+
+  }
+
+  public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+         String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+    String sliceName = message.getStr(SHARD_ID_PROP);
+    DocCollection collection = clusterState.getCollection(collectionName);
+
+    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
+    int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
+    int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
+    int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
+
+    if (numNrtReplicas + numTlogReplicas <= 0) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+    }
+
+    Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
+
+    boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+    List<ReplicaPosition> positions;
+    if (usePolicyFramework) {
+      if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
+      positions = Assign.identifyNodes(cloudManager,
+          clusterState,
+          Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM),
+          collection.getName(),
+          message,
+          Collections.singletonList(sliceName),
+          numNrtReplicas,
+          numTlogReplicas,
+          numPullReplicas);
+      sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+    } else {
+      List<Assign.ReplicaCount> sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas,
+          createNodeSetStr, cloudManager);
+      int i = 0;
+      positions = new ArrayList<>();
+      for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+          Replica.Type.TLOG, numTlogReplicas,
+          Replica.Type.PULL, numPullReplicas
+      ).entrySet()) {
+        for (int j = 0; j < e.getValue(); j++) {
+          positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
+          i++;
+        }
+      }
+    }
+    return positions;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
new file mode 100644
index 0000000..32715d6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the functionality of creating a collection level snapshot.
+ */
+public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName =  message.getStr(COLLECTION_PROP);
+    String commitName =  message.getStr(CoreAdminParams.COMMIT_NAME);
+    String asyncId = message.getStr(ASYNC);
+    SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
+    Date creationDate = new Date();
+
+    if(SolrSnapshotManager.snapshotExists(zkClient, collectionName, commitName)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
+          + " already exists for collection " + collectionName);
+    }
+
+    log.info("Creating a snapshot for collection={} with commitName={}", collectionName, commitName);
+
+    // Create a node in ZK to store the collection level snapshot meta-data.
+    SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName));
+    log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName);
+
+    Map<String, String> requestMap = new HashMap<>();
+    NamedList shardRequestResults = new NamedList();
+    Map<String, Slice> shardByCoreName = new HashMap<>();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        if (replica.getState() != State.ACTIVE) {
+          log.info("Replica {} is not active. Hence not sending the createsnapshot request", replica.getCoreName());
+          continue; // Since replica is not active - no point sending a request.
+        }
+
+        String coreName = replica.getStr(CORE_NAME_PROP);
+
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATESNAPSHOT.toString());
+        params.set(NAME, slice.getName());
+        params.set(CORE_NAME_PROP, coreName);
+        params.set(CoreAdminParams.COMMIT_NAME, commitName);
+
+        ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+        log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName);
+
+        shardByCoreName.put(coreName, slice);
+      }
+    }
+
+    // At this point we want to make sure that at-least one replica for every shard
+    // is able to create the snapshot. If that is not the case, then we fail the request.
+    // This is to take care of the situation where e.g. entire shard is unavailable.
+    Set<String> failedShards = new HashSet<>();
+
+    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+    NamedList success = (NamedList) shardRequestResults.get("success");
+    List<CoreSnapshotMetaData> replicas = new ArrayList<>();
+    if (success != null) {
+      for ( int i = 0 ; i < success.size() ; i++) {
+        NamedList resp = (NamedList)success.getVal(i);
+
+        // Check if this core is the leader for the shard. The idea here is that during the backup
+        // operation we preferably use the snapshot of the "leader" replica since it is most likely
+        // to have latest state.
+        String coreName = (String)resp.get(CoreAdminParams.CORE);
+        Slice slice = shardByCoreName.remove(coreName);
+        boolean leader = (slice.getLeader() != null && slice.getLeader().getCoreName().equals(coreName));
+        resp.add(SolrSnapshotManager.SHARD_ID, slice.getName());
+        resp.add(SolrSnapshotManager.LEADER, leader);
+
+        CoreSnapshotMetaData c = new CoreSnapshotMetaData(resp);
+        replicas.add(c);
+        log.info("Snapshot with commitName {} is created successfully for core {}", commitName, c.getCoreName());
+      }
+    }
+
+    if (!shardByCoreName.isEmpty()) { // One or more failures.
+      log.warn("Unable to create a snapshot with name {} for following cores {}", commitName, shardByCoreName.keySet());
+
+      // Count number of failures per shard.
+      Map<String, Integer> failuresByShardId = new HashMap<>();
+      for (Map.Entry<String,Slice> entry : shardByCoreName.entrySet()) {
+        int f = 0;
+        if (failuresByShardId.get(entry.getValue().getName()) != null) {
+          f = failuresByShardId.get(entry.getValue().getName());
+        }
+        failuresByShardId.put(entry.getValue().getName(), f + 1);
+      }
+
+      // Now that we know number of failures per shard, we can figure out
+      // if at-least one replica per shard was able to create a snapshot or not.
+      DocCollection collectionStatus = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+      for (Map.Entry<String,Integer> entry : failuresByShardId.entrySet()) {
+        int replicaCount = collectionStatus.getSlice(entry.getKey()).getReplicas().size();
+        if (replicaCount <= entry.getValue()) {
+          failedShards.add(entry.getKey());
+        }
+      }
+    }
+
+    if (failedShards.isEmpty()) { // No failures.
+      CollectionSnapshotMetaData meta = new CollectionSnapshotMetaData(commitName, SnapshotStatus.Successful, creationDate, replicas);
+      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, meta);
+      log.info("Saved following snapshot information for collection={} with commitName={} in Zookeeper : {}", collectionName,
+          commitName, meta.toNamedList());
+    } else {
+      log.warn("Failed to create a snapshot for collection {} with commitName = {}. Snapshot could not be captured for following shards {}",
+          collectionName, commitName, failedShards);
+      // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
+      // which cores have the named snapshot.
+      CollectionSnapshotMetaData meta = new CollectionSnapshotMetaData(commitName, SnapshotStatus.Failed, creationDate, replicas);
+      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, meta);
+      log.info("Saved following snapshot information for collection={} with commitName={} in Zookeeper : {}", collectionName,
+          commitName, meta.toNamedList());
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to create snapshot on shards " + failedShards);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java
new file mode 100644
index 0000000..e199d7d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteAliasCmd.java
@@ -0,0 +1,43 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteAliasCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String aliasName = message.getStr(NAME);
+
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    zkStateReader.aliasesHolder.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(aliasName, null));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
new file mode 100644
index 0000000..bdae8b9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -0,0 +1,142 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.NonExistentCoreException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
+
+  public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    Aliases aliases = zkStateReader.getAliases();
+    final String collection = message.getStr(NAME);
+    for (Map.Entry<String, List<String>> ent :  aliases.getCollectionAliasListMap().entrySet()) {
+      if (ent.getValue().contains(collection)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Collection : " + collection + " is part of alias " + ent.getKey() + " remove or modify the alias before removing this collection.");
+      }
+    }
+
+    try {
+      // Remove the snapshots meta-data for this collection in ZK. Deleting actual index files
+      // should be taken care of as part of collection delete operation.
+      SolrZkClient zkClient = zkStateReader.getZkClient();
+      SolrSnapshotManager.cleanupCollectionLevelSnapshots(zkClient, collection);
+
+      if (zkStateReader.getClusterState().getCollectionOrNull(collection) == null) {
+        if (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+          // if the collection is not in the clusterstate, but is listed in zk, do nothing, it will just
+          // be removed in the finally - we cannot continue, because the below code will error if the collection
+          // is not in the clusterstate
+          return;
+        }
+      }
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+      params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+      params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+
+      String asyncId = message.getStr(ASYNC);
+      Map<String, String> requestMap = null;
+      if (asyncId != null) {
+        requestMap = new HashMap<>();
+      }
+
+      Set<String> okayExceptions = new HashSet<>(1);
+      okayExceptions.add(NonExistentCoreException.class.getName());
+
+      ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+      // wait for a while until we don't see the collection
+      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+      boolean removed = false;
+      while (! timeout.hasTimedOut()) {
+        timeout.sleep(100);
+        removed = !zkStateReader.getClusterState().hasCollection(collection);
+        if (removed) {
+          timeout.sleep(500); // just a bit of time so it's more likely other
+          // readers see on return
+          break;
+        }
+      }
+      if (!removed) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Could not fully remove collection: " + collection);
+      }
+
+    } finally {
+
+      try {
+        if (zkStateReader.getZkClient().exists(
+            ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+          zkStateReader.getZkClient().clean(
+              ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+        }
+      } catch (InterruptedException e) {
+        SolrException.log(log, "Cleaning up collection in zk was interrupted:"
+            + collection, e);
+        Thread.currentThread().interrupt();
+      } catch (KeeperException e) {
+        SolrException.log(log, "Problem cleaning up collection in zk:"
+            + collection, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
new file mode 100644
index 0000000..ab4dc0c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteNodeCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ocmh.checkRequired(message, "node");
+    String node = message.getStr("node");
+    if (!state.liveNodesContain(node)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + node + " is not live");
+    }
+    List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state);
+    List<String> singleReplicas = verifyReplicaAvailability(sourceReplicas, state);
+    if (!singleReplicas.isEmpty()) {
+      results.add("failure", "Can't delete the only existing non-PULL replica(s) on node " + node + ": " + singleReplicas.toString());
+    } else {
+      cleanupReplicas(results, state, sourceReplicas, ocmh, node, message.getStr(ASYNC));
+    }
+  }
+
+  // collect names of replicas that cannot be deleted
+  static List<String> verifyReplicaAvailability(List<ZkNodeProps> sourceReplicas, ClusterState state) {
+    List<String> res = new ArrayList<>();
+    for (ZkNodeProps sourceReplica : sourceReplicas) {
+      String coll = sourceReplica.getStr(COLLECTION_PROP);
+      String shard = sourceReplica.getStr(SHARD_ID_PROP);
+      String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+      DocCollection collection = state.getCollection(coll);
+      Slice slice = collection.getSlice(shard);
+      if (slice.getReplicas().size() < 2) {
+        // can't delete the only replica in existence
+        res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+      } else { // check replica types
+        int otherNonPullReplicas = 0;
+        for (Replica r : slice.getReplicas()) {
+          if (!r.getName().equals(replicaName) && !r.getType().equals(Replica.Type.PULL)) {
+            otherNonPullReplicas++;
+          }
+        }
+        // can't delete - there are no other non-pull replicas
+        if (otherNonPullReplicas == 0) {
+          res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+        }
+      }
+    }
+    return res;
+  }
+
+  static void cleanupReplicas(NamedList results,
+                              ClusterState clusterState,
+                              List<ZkNodeProps> sourceReplicas,
+                              OverseerCollectionMessageHandler ocmh,
+                              String node,
+                              String async) throws InterruptedException {
+    CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
+    for (ZkNodeProps sourceReplica : sourceReplicas) {
+      String coll = sourceReplica.getStr(COLLECTION_PROP);
+      String shard = sourceReplica.getStr(SHARD_ID_PROP);
+      String type = sourceReplica.getStr(ZkStateReader.REPLICA_TYPE);
+      log.info("Deleting replica type={} for collection={} shard={} on node={}", type, coll, shard, node);
+      NamedList deleteResult = new NamedList();
+      try {
+        if (async != null) sourceReplica = sourceReplica.plus(ASYNC, async);
+        ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
+          cleanupLatch.countDown();
+          if (deleteResult.get("failure") != null) {
+            synchronized (results) {
+
+              results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
+                  " on node=%s", coll, shard, node));
+            }
+          }
+        });
+      } catch (KeeperException e) {
+        log.warn("Error deleting ", e);
+        cleanupLatch.countDown();
+      } catch (Exception e) {
+        log.warn("Error deleting ", e);
+        cleanupLatch.countDown();
+        throw e;
+      }
+    }
+    log.debug("Waiting for delete node action to complete");
+    cleanupLatch.await(5, TimeUnit.MINUTES);
+  }
+
+
+}