You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2016/10/24 18:32:36 UTC

[32/50] [abbrv] lucene-solr:jira/solr-8542-v2: SOLR-9326: Ability to create/delete/list snapshots at collection level.

SOLR-9326: Ability to create/delete/list snapshots at collection level.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/57ba9614
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/57ba9614
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/57ba9614

Branch: refs/heads/jira/solr-8542-v2
Commit: 57ba96145ce8233034c67ffaead22d3bd7f3460f
Parents: 49ca9ce
Author: yonik <yo...@apache.org>
Authored: Fri Oct 21 07:08:47 2016 -0400
Committer: yonik <yo...@apache.org>
Committed: Fri Oct 21 09:47:02 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../java/org/apache/solr/cloud/BackupCmd.java   |  75 ++++-
 .../apache/solr/cloud/CreateSnapshotCmd.java    | 179 ++++++++++++
 .../apache/solr/cloud/DeleteSnapshotCmd.java    | 160 +++++++++++
 .../cloud/OverseerCollectionMessageHandler.java |   2 +
 .../snapshots/CollectionSnapshotMetaData.java   | 242 ++++++++++++++++
 .../core/snapshots/SolrSnapshotManager.java     | 180 ++++++++++++
 .../solr/handler/admin/CollectionsHandler.java  |  54 +++-
 .../solr/handler/admin/CoreAdminOperation.java  |   7 +-
 .../solr/handler/admin/CreateSnapshotOp.java    |  10 +-
 .../solr/handler/admin/DeleteSnapshotOp.java    |   4 +
 .../core/snapshots/TestSolrCloudSnapshots.java  | 285 +++++++++++++++++++
 .../solrj/request/CollectionAdminRequest.java   | 116 +++++++-
 .../solr/common/params/CollectionParams.java    |   3 +
 14 files changed, 1309 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7228559..b4b0a33 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -149,6 +149,9 @@ New Features
 * SOLR-8370: Display configured Similarity in Schema-Browser, both global/default and per-field/field-type 
   (janhoy, Alexandre Rafalovitch)
 
+* SOLR-9326: Ability to create/delete/list snapshots at collection level.
+  (Hrishikesh Gadre via yonik)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
index 648eee8..b859d8e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
@@ -19,16 +19,21 @@ package org.apache.solr.cloud;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -36,6 +41,10 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.backup.BackupManager;
 import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +72,21 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     String asyncId = message.getStr(ASYNC);
     String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
 
+    String commitName = message.getStr(CoreAdminParams.COMMIT_NAME);
+    Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
+    if (commitName != null) {
+      SolrZkClient zkClient = ocmh.overseer.getZkController().getZkClient();
+      snapshotMeta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
+      if (!snapshotMeta.isPresent()) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
+            + " does not exist for collection " + collectionName);
+      }
+      if (snapshotMeta.get().getStatus() != SnapshotStatus.Successful) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName + " for collection " + collectionName
+            + " has not completed successfully. The status is " + snapshotMeta.get().getStatus());
+      }
+    }
+
     Map<String, String> requestMap = new HashMap<>();
     Instant startTime = Instant.now();
 
@@ -85,8 +109,28 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
         backupPath);
 
+    Collection<String> shardsToConsider = Collections.emptySet();
+    if (snapshotMeta.isPresent()) {
+      shardsToConsider = snapshotMeta.get().getShards();
+    }
+
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
-      Replica replica = slice.getLeader();
+      Replica replica = null;
+
+      if (snapshotMeta.isPresent()) {
+        if (!shardsToConsider.contains(slice.getName())) {
+          log.warn("Skipping the backup for shard {} since it wasn't part of the collection {} when snapshot {} was created.",
+              slice.getName(), collectionName, snapshotMeta.get().getName());
+          continue;
+        }
+        replica = selectReplicaWithSnapshot(snapshotMeta.get(), slice);
+      } else {
+        // Note - Actually this can return a null value when there is no leader for this shard.
+        replica = slice.getLeader();
+        if (replica == null) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "No 'leader' replica available for shard " + slice.getName() + " of collection " + collectionName);
+        }
+      }
 
       String coreName = replica.getStr(CORE_NAME_PROP);
 
@@ -96,6 +140,9 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
       params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
       params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); // note: index dir will be here then the "snapshot." + slice name
       params.set(CORE_NAME_PROP, coreName);
+      if (snapshotMeta.isPresent()) {
+        params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName());
+      }
 
       ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
       log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
@@ -129,4 +176,30 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
 
     log.info("Completed backing up ZK data for backupName={}", backupName);
   }
+
+  private Replica selectReplicaWithSnapshot(CollectionSnapshotMetaData snapshotMeta, Slice slice) {
+    // The goal here is to choose the snapshot of the replica which was the leader at the time snapshot was created.
+    // If that is not possible, we choose any other replica for the given shard.
+    Collection<CoreSnapshotMetaData> snapshots = snapshotMeta.getReplicaSnapshotsForShard(slice.getName());
+
+    Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(x -> x.isLeader()).findFirst();
+    if (leaderCore.isPresent()) {
+      log.info("Replica {} was the leader when snapshot {} was created.", leaderCore.get().getCoreName(), snapshotMeta.getName());
+      Replica r = slice.getReplica(leaderCore.get().getCoreName());
+      if ((r != null) && !r.getState().equals(State.DOWN)) {
+        return r;
+      }
+    }
+
+    Optional<Replica> r = slice.getReplicas().stream()
+                               .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
+                               .findFirst();
+
+    if (!r.isPresent()) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Unable to find any live replica with a snapshot named " + snapshotMeta.getName() + " for shard " + slice.getName());
+    }
+
+    return r.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/cloud/CreateSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateSnapshotCmd.java
new file mode 100644
index 0000000..5de65a4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateSnapshotCmd.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the functionality of creating a collection level snapshot.
+ */
+public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName =  message.getStr(COLLECTION_PROP);
+    String commitName =  message.getStr(CoreAdminParams.COMMIT_NAME);
+    String asyncId = message.getStr(ASYNC);
+    SolrZkClient zkClient = this.ocmh.overseer.getZkController().getZkClient();
+    Date creationDate = new Date();
+
+    if(SolrSnapshotManager.snapshotExists(zkClient, collectionName, commitName)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
+          + " already exists for collection " + collectionName);
+    }
+
+    log.info("Creating a snapshot for collection={} with commitName={}", collectionName, commitName);
+
+    // Create a node in ZK to store the collection level snapshot meta-data.
+    SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName));
+    log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName);
+
+    Map<String, String> requestMap = new HashMap<>();
+    NamedList shardRequestResults = new NamedList();
+    Map<String, Slice> shardByCoreName = new HashMap<>();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        if (replica.getState() != State.ACTIVE) {
+          log.info("Replica {} is not active. Hence not sending the createsnapshot request", replica.getCoreName());
+          continue; // Since replica is not active - no point sending a request.
+        }
+
+        String coreName = replica.getStr(CORE_NAME_PROP);
+
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATESNAPSHOT.toString());
+        params.set(NAME, slice.getName());
+        params.set(CORE_NAME_PROP, coreName);
+        params.set(CoreAdminParams.COMMIT_NAME, commitName);
+
+        ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+        log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName);
+
+        shardByCoreName.put(coreName, slice);
+      }
+    }
+
+    // At this point we want to make sure that at-least one replica for every shard
+    // is able to create the snapshot. If that is not the case, then we fail the request.
+    // This is to take care of the situation where e.g. entire shard is unavailable.
+    Set<String> failedShards = new HashSet<>();
+
+    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+    NamedList success = (NamedList) shardRequestResults.get("success");
+    List<CoreSnapshotMetaData> replicas = new ArrayList<>();
+    if (success != null) {
+      for ( int i = 0 ; i < success.size() ; i++) {
+        NamedList resp = (NamedList)success.getVal(i);
+
+        // Check if this core is the leader for the shard. The idea here is that during the backup
+        // operation we preferably use the snapshot of the "leader" replica since it is most likely
+        // to have latest state.
+        String coreName = (String)resp.get(CoreAdminParams.CORE);
+        Slice slice = shardByCoreName.remove(coreName);
+        boolean leader = (slice.getLeader() != null && slice.getLeader().getCoreName().equals(coreName));
+        resp.add(SolrSnapshotManager.SHARD_ID, slice.getName());
+        resp.add(SolrSnapshotManager.LEADER, leader);
+
+        CoreSnapshotMetaData c = new CoreSnapshotMetaData(resp);
+        replicas.add(c);
+        log.info("Snapshot with commitName {} is created successfully for core {}", commitName, c.getCoreName());
+      }
+    }
+
+    if (!shardByCoreName.isEmpty()) { // One or more failures.
+      log.warn("Unable to create a snapshot with name {} for following cores {}", commitName, shardByCoreName.keySet());
+
+      // Count number of failures per shard.
+      Map<String, Integer> failuresByShardId = new HashMap<>();
+      for (Map.Entry<String,Slice> entry : shardByCoreName.entrySet()) {
+        int f = 0;
+        if (failuresByShardId.get(entry.getValue().getName()) != null) {
+          f = failuresByShardId.get(entry.getValue().getName());
+        }
+        failuresByShardId.put(entry.getValue().getName(), f + 1);
+      }
+
+      // Now that we know number of failures per shard, we can figure out
+      // if at-least one replica per shard was able to create a snapshot or not.
+      DocCollection collectionStatus = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+      for (Map.Entry<String,Integer> entry : failuresByShardId.entrySet()) {
+        int replicaCount = collectionStatus.getSlice(entry.getKey()).getReplicas().size();
+        if (replicaCount <= entry.getValue()) {
+          failedShards.add(entry.getKey());
+        }
+      }
+    }
+
+    if (failedShards.isEmpty()) { // No failures.
+      CollectionSnapshotMetaData meta = new CollectionSnapshotMetaData(commitName, SnapshotStatus.Successful, creationDate, replicas);
+      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, meta);
+      log.info("Saved following snapshot information for collection={} with commitName={} in Zookeeper : {}", collectionName,
+          commitName, meta.toNamedList());
+    } else {
+      log.warn("Failed to create a snapshot for collection {} with commitName = {}. Snapshot could not be captured for following shards {}",
+          collectionName, commitName, failedShards);
+      // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
+      // which cores have the named snapshot.
+      CollectionSnapshotMetaData meta = new CollectionSnapshotMetaData(commitName, SnapshotStatus.Failed, creationDate, replicas);
+      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, meta);
+      log.info("Saved following snapshot information for collection={} with commitName={} in Zookeeper : {}", collectionName,
+          commitName, meta.toNamedList());
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to create snapshot on shards " + failedShards);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java
new file mode 100644
index 0000000..765f4b9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the functionality of deleting a collection level snapshot.
+ */
+public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName =  message.getStr(COLLECTION_PROP);
+    String commitName =  message.getStr(CoreAdminParams.COMMIT_NAME);
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = new HashMap<>();
+    NamedList shardRequestResults = new NamedList();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    SolrZkClient zkClient = ocmh.overseer.getZkController().getZkClient();
+
+    Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
+    if (!meta.isPresent()) { // Snapshot not found. Nothing to do.
+      return;
+    }
+
+    log.info("Deleting a snapshot for collection={} with commitName={}", collectionName, commitName);
+
+    Set<String> existingCores = new HashSet<>();
+    for (Slice s : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+      for (Replica r : s.getReplicas()) {
+        existingCores.add(r.getCoreName());
+      }
+    }
+
+    Set<String> coresWithSnapshot = new HashSet<>();
+    for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
+      if (existingCores.contains(m.getCoreName())) {
+        coresWithSnapshot.add(m.getCoreName());
+      }
+    }
+
+    log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
+    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        if (replica.getState() == State.DOWN) {
+          continue; // Since replica is down - no point sending a request.
+        }
+
+        // Note - when a snapshot is found in_progress state - it is the result of overseer
+        // failure while handling the snapshot creation. Since we don't know the exact set of
+        // replicas to contact at this point, we try on all replicas.
+        if (meta.get().getStatus() == SnapshotStatus.InProgress || coresWithSnapshot.contains(replica.getCoreName())) {
+          String coreName = replica.getStr(CORE_NAME_PROP);
+
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminAction.DELETESNAPSHOT.toString());
+          params.set(NAME, slice.getName());
+          params.set(CORE_NAME_PROP, coreName);
+          params.set(CoreAdminParams.COMMIT_NAME, commitName);
+
+          log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName);
+          ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+        }
+      }
+    }
+
+    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+    NamedList success = (NamedList) shardRequestResults.get("success");
+    List<CoreSnapshotMetaData> replicas = new ArrayList<>();
+    if (success != null) {
+      for ( int i = 0 ; i < success.size() ; i++) {
+        NamedList resp = (NamedList)success.getVal(i);
+        // Unfortunately async processing logic doesn't provide the "core" name automatically.
+        String coreName = (String)resp.get("core");
+        coresWithSnapshot.remove(coreName);
+      }
+    }
+
+    if (!coresWithSnapshot.isEmpty()) { // One or more failures.
+      log.warn("Failed to delete a snapshot for collection {} with commitName = {}. Snapshot could not be deleted for following cores {}",
+          collectionName, commitName, coresWithSnapshot);
+
+      List<CoreSnapshotMetaData> replicasWithSnapshot = new ArrayList<>();
+      for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
+        if (coresWithSnapshot.contains(m.getCoreName())) {
+          replicasWithSnapshot.add(m);
+        }
+      }
+
+      // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
+      // which cores still contain the named snapshot.
+      CollectionSnapshotMetaData newResult = new CollectionSnapshotMetaData(meta.get().getName(), SnapshotStatus.Failed,
+          meta.get().getCreationDate(), replicasWithSnapshot);
+      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, newResult);
+      log.info("Saved snapshot information for collection={} with commitName={} in Zookeeper as follows", collectionName, commitName,
+          Utils.toJSON(newResult));
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to delete snapshot on cores " + coresWithSnapshot);
+
+    } else {
+      // Delete the ZK path so that we eliminate the references of this snapshot from collection level meta-data.
+      SolrSnapshotManager.deleteCollectionLevelSnapshot(zkClient, collectionName, commitName);
+      log.info("Deleted Zookeeper snapshot metdata for collection={} with commitName={}", collectionName, commitName);
+      log.info("Successfully deleted snapshot for collection={} with commitName={}", collectionName, commitName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 0520488..a21f18f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -182,6 +182,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
         .put(DELETENODE, new DeleteNodeCmd(this))
         .put(BACKUP, new BackupCmd(this))
         .put(RESTORE, new RestoreCmd(this))
+        .put(CREATESNAPSHOT, new CreateSnapshotCmd(this))
+        .put(DELETESNAPSHOT, new DeleteSnapshotCmd(this))
         .put(SPLITSHARD, new SplitShardCmd(this))
         .put(ADDROLE, new OverseerRoleCmd(this, ADDROLE, overseerPrioritizer))
         .put(REMOVEROLE, new OverseerRoleCmd(this, REMOVEROLE, overseerPrioritizer))

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/core/snapshots/CollectionSnapshotMetaData.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/snapshots/CollectionSnapshotMetaData.java b/solr/core/src/java/org/apache/solr/core/snapshots/CollectionSnapshotMetaData.java
new file mode 100644
index 0000000..4170861
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/snapshots/CollectionSnapshotMetaData.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core.snapshots;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.noggit.JSONWriter;
+
+/**
+ * This class defines the meta-data about a collection level snapshot
+ */
+public class CollectionSnapshotMetaData implements JSONWriter.Writable {
+  public static class CoreSnapshotMetaData implements JSONWriter.Writable {
+    private final String coreName;
+    private final String indexDirPath;
+    private final long generationNumber;
+    private final boolean leader;
+    private final String shardId;
+    private final Collection<String> files;
+
+    public CoreSnapshotMetaData(String coreName, String indexDirPath, long generationNumber, String shardId, boolean leader, Collection<String> files) {
+      this.coreName = coreName;
+      this.indexDirPath = indexDirPath;
+      this.generationNumber = generationNumber;
+      this.shardId = shardId;
+      this.leader = leader;
+      this.files = files;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public CoreSnapshotMetaData(NamedList resp) {
+      this.coreName = (String)resp.get(CoreAdminParams.CORE);
+      this.indexDirPath = (String)resp.get(SolrSnapshotManager.INDEX_DIR_PATH);
+      this.generationNumber = (Long)resp.get(SolrSnapshotManager.GENERATION_NUM);
+      this.shardId = (String)resp.get(SolrSnapshotManager.SHARD_ID);
+      this.leader = (Boolean)resp.get(SolrSnapshotManager.LEADER);
+      this.files = (Collection<String>)resp.get(SolrSnapshotManager.FILE_LIST);
+    }
+
+    public String getCoreName() {
+      return coreName;
+    }
+
+    public String getIndexDirPath() {
+      return indexDirPath;
+    }
+
+    public long getGenerationNumber() {
+      return generationNumber;
+    }
+
+    public Collection<String> getFiles() {
+      return files;
+    }
+
+    public String getShardId() {
+      return shardId;
+    }
+
+    public boolean isLeader() {
+      return leader;
+    }
+
+    @Override
+    public void write(JSONWriter arg0) {
+      LinkedHashMap<String, Object> info = new LinkedHashMap<String, Object>();
+      info.put(CoreAdminParams.CORE, getCoreName());
+      info.put(SolrSnapshotManager.INDEX_DIR_PATH, getIndexDirPath());
+      info.put(SolrSnapshotManager.GENERATION_NUM, getGenerationNumber());
+      info.put(SolrSnapshotManager.SHARD_ID, getShardId());
+      info.put(SolrSnapshotManager.LEADER, isLeader());
+      info.put(SolrSnapshotManager.FILE_LIST, getFiles());
+      arg0.write(info);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public NamedList toNamedList() {
+      NamedList result = new NamedList();
+      result.add(CoreAdminParams.CORE, getCoreName());
+      result.add(SolrSnapshotManager.INDEX_DIR_PATH, getIndexDirPath());
+      result.add(SolrSnapshotManager.GENERATION_NUM, getGenerationNumber());
+      result.add(SolrSnapshotManager.SHARD_ID, getShardId());
+      result.add(SolrSnapshotManager.LEADER, isLeader());
+      result.add(SolrSnapshotManager.FILE_LIST, getFiles());
+      return result;
+    }
+  }
+
+  public static enum SnapshotStatus {
+    Successful, InProgress, Failed;
+  }
+
+  private final String name;
+  private final SnapshotStatus status;
+  private final Date creationDate;
+  private final List<CoreSnapshotMetaData> replicaSnapshots;
+
+  public CollectionSnapshotMetaData(String name) {
+    this(name, SnapshotStatus.InProgress, new Date(), Collections.<CoreSnapshotMetaData>emptyList());
+  }
+
+  public CollectionSnapshotMetaData(String name, SnapshotStatus status, Date creationTime, List<CoreSnapshotMetaData> replicaSnapshots) {
+    this.name = name;
+    this.status = status;
+    this.creationDate = creationTime;
+    this.replicaSnapshots = replicaSnapshots;
+  }
+
+  @SuppressWarnings("unchecked")
+  public CollectionSnapshotMetaData(Map<String, Object> data) {
+    this.name = (String)data.get(CoreAdminParams.NAME);
+    this.status = SnapshotStatus.valueOf((String)data.get(SolrSnapshotManager.SNAPSHOT_STATUS));
+    this.creationDate = new Date((Long)data.get(SolrSnapshotManager.CREATION_DATE));
+    this.replicaSnapshots = new ArrayList<>();
+
+    List<Object> r = (List<Object>) data.get(SolrSnapshotManager.SNAPSHOT_REPLICAS);
+    for (Object x : r) {
+      Map<String, Object> info = (Map<String, Object>)x;
+      String coreName = (String)info.get(CoreAdminParams.CORE);
+      String indexDirPath = (String)info.get(SolrSnapshotManager.INDEX_DIR_PATH);
+      long generationNumber = (Long) info.get(SolrSnapshotManager.GENERATION_NUM);
+      String shardId = (String)info.get(SolrSnapshotManager.SHARD_ID);
+      boolean leader = (Boolean) info.get(SolrSnapshotManager.LEADER);
+      Collection<String> files = (Collection<String>)info.get(SolrSnapshotManager.FILE_LIST);
+      replicaSnapshots.add(new CoreSnapshotMetaData(coreName, indexDirPath, generationNumber, shardId, leader, files));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public CollectionSnapshotMetaData(NamedList<Object> data) {
+    this.name = (String)data.get(CoreAdminParams.NAME);
+    String statusStr = (String)data.get(SolrSnapshotManager.SNAPSHOT_STATUS);
+    this.creationDate = new Date((Long)data.get(SolrSnapshotManager.CREATION_DATE));
+    this.status = SnapshotStatus.valueOf(statusStr);
+    this.replicaSnapshots = new ArrayList<>();
+
+    NamedList<Object> r = (NamedList<Object>) data.get(SolrSnapshotManager.SNAPSHOT_REPLICAS);
+    for (Map.Entry<String,Object> x : r) {
+      NamedList<Object> info = (NamedList<Object>)x.getValue();
+      String coreName = (String)info.get(CoreAdminParams.CORE);
+      String indexDirPath = (String)info.get(SolrSnapshotManager.INDEX_DIR_PATH);
+      long generationNumber = (Long) info.get(SolrSnapshotManager.GENERATION_NUM);
+      String shardId = (String)info.get(SolrSnapshotManager.SHARD_ID);
+      boolean leader = (Boolean) info.get(SolrSnapshotManager.LEADER);
+      Collection<String> files = (Collection<String>)info.get(SolrSnapshotManager.FILE_LIST);
+      replicaSnapshots.add(new CoreSnapshotMetaData(coreName, indexDirPath, generationNumber, shardId, leader, files));
+    }
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public SnapshotStatus getStatus() {
+    return status;
+  }
+
+  public Date getCreationDate() {
+    return creationDate;
+  }
+
+  public List<CoreSnapshotMetaData> getReplicaSnapshots() {
+    return replicaSnapshots;
+  }
+
+  public List<CoreSnapshotMetaData> getReplicaSnapshotsForShard(String shardId) {
+    List<CoreSnapshotMetaData> result = new ArrayList<>();
+    for (CoreSnapshotMetaData d : replicaSnapshots) {
+      if (d.getShardId().equals(shardId)) {
+        result.add(d);
+      }
+    }
+    return result;
+  }
+
+  public boolean isSnapshotExists(String shardId, Replica r) {
+    for (CoreSnapshotMetaData d : replicaSnapshots) {
+      if (d.getShardId().equals(shardId) && d.getCoreName().equals(r.getCoreName())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public Collection<String> getShards() {
+    Set<String> result = new HashSet<>();
+    for (CoreSnapshotMetaData d : replicaSnapshots) {
+      result.add(d.getShardId());
+    }
+    return result;
+  }
+
+  @Override
+  public void write(JSONWriter arg0) {
+    LinkedHashMap<String, Object> result = new LinkedHashMap<>();
+    result.put(CoreAdminParams.NAME, this.name);
+    result.put(SolrSnapshotManager.SNAPSHOT_STATUS, this.status.toString());
+    result.put(SolrSnapshotManager.CREATION_DATE, this.getCreationDate().getTime());
+    result.put(SolrSnapshotManager.SNAPSHOT_REPLICAS, this.replicaSnapshots);
+    arg0.write(result);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public NamedList toNamedList() {
+    NamedList result = new NamedList();
+    result.add(CoreAdminParams.NAME, this.name);
+    result.add(SolrSnapshotManager.SNAPSHOT_STATUS, this.status.toString());
+    result.add(SolrSnapshotManager.CREATION_DATE, this.getCreationDate().getTime());
+
+    NamedList replicas = new NamedList();
+    for (CoreSnapshotMetaData x : replicaSnapshots) {
+      replicas.add(x.getCoreName(), x.toNamedList());
+    }
+    result.add(SolrSnapshotManager.SNAPSHOT_REPLICAS, replicas);
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
index 4257baf..354307d 100644
--- a/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
+++ b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
@@ -18,9 +18,12 @@ package org.apache.solr.core.snapshots;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
@@ -28,9 +31,13 @@ import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.store.Directory;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
 import org.apache.solr.update.SolrIndexWriter;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +47,172 @@ import org.slf4j.LoggerFactory;
 public class SolrSnapshotManager {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static final String INDEX_DIR_PATH = "indexDirPath";
+  public static final String GENERATION_NUM = "generation";
+  public static final String SNAPSHOT_STATUS = "status";
+  public static final String CREATION_DATE = "creationDate";
+  public static final String SNAPSHOT_REPLICAS = "replicas";
+  public static final String SNAPSHOTS_INFO = "snapshots";
+  public static final String LEADER = "leader";
+  public static final String SHARD_ID = "shard_id";
+  public static final String FILE_LIST = "files";
+
+  /**
+   * This method returns if a named snapshot exists for the specified collection.
+   *
+   * @param zkClient Zookeeper client
+   * @param collectionName The name of the collection
+   * @param commitName The name of the snapshot
+   * @return true if the named snapshot exists
+   *         false Otherwise
+   * @throws KeeperException In case of Zookeeper error
+   * @throws InterruptedException In case of thread interruption.
+   */
+  public static boolean snapshotExists(SolrZkClient zkClient, String collectionName, String commitName)
+      throws KeeperException, InterruptedException {
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.ofNullable(commitName));
+    return zkClient.exists(zkPath, true);
+  }
+
+  /**
+   * This method creates an entry for the named snapshot for the specified collection in Zookeeper.
+   *
+   * @param zkClient Zookeeper client
+   * @param collectionName The name of the collection
+   * @param meta The {@linkplain CollectionSnapshotMetaData} corresponding to named snapshot
+   * @throws KeeperException In case of Zookeeper error
+   * @throws InterruptedException In case of thread interruption.
+   */
+  public static void createCollectionLevelSnapshot(SolrZkClient zkClient, String collectionName,
+      CollectionSnapshotMetaData meta) throws KeeperException, InterruptedException {
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.of(meta.getName()));
+    zkClient.makePath(zkPath, Utils.toJSON(meta), CreateMode.PERSISTENT, true);
+  }
+
+  /**
+   * This method updates an entry for the named snapshot for the specified collection in Zookeeper.
+   *
+   * @param zkClient Zookeeper client
+   * @param collectionName  The name of the collection
+   * @param meta The {@linkplain CollectionSnapshotMetaData} corresponding to named snapshot
+   * @throws KeeperException In case of Zookeeper error
+   * @throws InterruptedException In case of thread interruption.
+   */
+  public static void updateCollectionLevelSnapshot(SolrZkClient zkClient, String collectionName,
+      CollectionSnapshotMetaData meta) throws KeeperException, InterruptedException {
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.of(meta.getName()));
+    zkClient.setData(zkPath, Utils.toJSON(meta), -1, true);
+  }
+
+  /**
+   * This method deletes an entry for the named snapshot for the specified collection in Zookeeper.
+   *
+   * @param zkClient Zookeeper client
+   * @param collectionName The name of the collection
+   * @param commitName  The name of the snapshot
+   * @throws InterruptedException In case of thread interruption.
+   * @throws KeeperException  In case of Zookeeper error
+   */
+  public static void deleteCollectionLevelSnapshot(SolrZkClient zkClient, String collectionName, String commitName)
+      throws InterruptedException, KeeperException {
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.of(commitName));
+    zkClient.delete(zkPath, -1, true);
+  }
+
+  /**
+   * This method deletes all snapshots for the specified collection in Zookeeper.
+   *
+   * @param zkClient  Zookeeper client
+   * @param collectionName The name of the collection
+   * @throws InterruptedException In case of thread interruption.
+   * @throws KeeperException In case of Zookeeper error
+   */
+  public static void cleanupCollectionLevelSnapshots(SolrZkClient zkClient, String collectionName)
+      throws InterruptedException, KeeperException {
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.empty());
+    try {
+      // Delete the meta-data for each snapshot.
+      Collection<String> snapshots = zkClient.getChildren(zkPath, null, true);
+      for (String snapshot : snapshots) {
+        String path = getSnapshotMetaDataZkPath(collectionName, Optional.of(snapshot));
+        try {
+          zkClient.delete(path, -1, true);
+        } catch (KeeperException ex) {
+          // Gracefully handle the case when the zk node doesn't exist
+          if ( ex.code() != KeeperException.Code.NONODE ) {
+            throw ex;
+          }
+        }
+      }
+
+      // Delete the parent node.
+      zkClient.delete(zkPath, -1, true);
+    } catch (KeeperException ex) {
+      // Gracefully handle the case when the zk node doesn't exist (e.g. if no snapshots were created for this collection).
+      if ( ex.code() != KeeperException.Code.NONODE ) {
+        throw ex;
+      }
+    }
+  }
+
+  /**
+   * This method returns the {@linkplain CollectionSnapshotMetaData} for the named snapshot for the specified collection in Zookeeper.
+   *
+   * @param zkClient  Zookeeper client
+   * @param collectionName  The name of the collection
+   * @param commitName The name of the snapshot
+   * @return (Optional) the {@linkplain CollectionSnapshotMetaData}
+   * @throws InterruptedException In case of thread interruption.
+   * @throws KeeperException In case of Zookeeper error
+   */
+  public static Optional<CollectionSnapshotMetaData> getCollectionLevelSnapshot(SolrZkClient zkClient, String collectionName, String commitName)
+      throws InterruptedException, KeeperException {
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.of(commitName));
+    try {
+      Map<String, Object> data = (Map<String, Object>)Utils.fromJSON(zkClient.getData(zkPath, null, null, true));
+      return Optional.of(new CollectionSnapshotMetaData(data));
+    } catch (KeeperException ex) {
+      // Gracefully handle the case when the zk node for a specific
+      // snapshot doesn't exist (e.g. due to a concurrent delete operation).
+      if ( ex.code() == KeeperException.Code.NONODE ) {
+        return Optional.empty();
+      }
+      throw ex;
+    }
+  }
+
+  /**
+   * This method returns the {@linkplain CollectionSnapshotMetaData} for each named snapshot for the specified collection in Zookeeper.
+   *
+   * @param zkClient Zookeeper client
+   * @param collectionName The name of the collection
+   * @return the {@linkplain CollectionSnapshotMetaData} for each named snapshot
+   * @throws InterruptedException In case of thread interruption.
+   * @throws KeeperException In case of Zookeeper error
+   */
+  public static Collection<CollectionSnapshotMetaData> listSnapshots(SolrZkClient zkClient, String collectionName)
+      throws InterruptedException, KeeperException {
+    Collection<CollectionSnapshotMetaData> result = new ArrayList<>();
+    String zkPath = getSnapshotMetaDataZkPath(collectionName, Optional.empty());
+
+    try {
+      Collection<String> snapshots = zkClient.getChildren(zkPath, null, true);
+      for (String snapshot : snapshots) {
+        Optional<CollectionSnapshotMetaData> s = getCollectionLevelSnapshot(zkClient, collectionName, snapshot);
+        if (s.isPresent()) {
+          result.add(s.get());
+        }
+      }
+    } catch (KeeperException ex) {
+      // Gracefully handle the case when the zk node doesn't exist (e.g. due to a concurrent delete collection operation).
+      if ( ex.code() != KeeperException.Code.NONODE ) {
+        throw ex;
+      }
+    }
+    return result;
+  }
+
+
   /**
    * This method deletes index files of the {@linkplain IndexCommit} for the specified generation number.
    *
@@ -117,4 +290,11 @@ public class SolrSnapshotManager {
       // Note the index writer creates a new commit during the close() operation (which is harmless).
     }
   }
+
+  private static String getSnapshotMetaDataZkPath(String collectionName, Optional<String> commitName) {
+    if (commitName.isPresent()) {
+      return "/snapshots/"+collectionName+"/"+commitName.get();
+    }
+    return "/snapshots/"+collectionName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 3e134d5..e290ccb 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -77,6 +77,8 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.request.SolrQueryRequest;
@@ -732,7 +734,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to check the existance of " + uri + ". Is it valid?", ex);
       }
 
-      Map<String, Object> params = req.getParams().getAll(null, NAME, COLLECTION_PROP);
+      Map<String, Object> params = req.getParams().getAll(null, NAME, COLLECTION_PROP, CoreAdminParams.COMMIT_NAME);
       params.put(CoreAdminParams.BACKUP_LOCATION, location);
       return params;
     }),
@@ -778,7 +780,57 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);
       return params;
     }),
+    CREATESNAPSHOT_OP(CREATESNAPSHOT, (req, rsp, h) -> {
+      req.getParams().required().check(COLLECTION_PROP, CoreAdminParams.COMMIT_NAME);
+
+      String collectionName = req.getParams().get(COLLECTION_PROP);
+      String commitName = req.getParams().get(CoreAdminParams.COMMIT_NAME);
+      ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
+      if (!clusterState.hasCollection(collectionName)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+      }
+
+      SolrZkClient client = h.coreContainer.getZkController().getZkClient();
+      if (SolrSnapshotManager.snapshotExists(client, collectionName, commitName)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "Snapshot with name '" + commitName + "' already exists for collection '"
+                + collectionName + "', no action taken.");
+      }
+
+      Map<String, Object> params = req.getParams().getAll(null, COLLECTION_PROP, CoreAdminParams.COMMIT_NAME);
+      return params;
+    }),
+    DELETESNAPSHOT_OP(DELETESNAPSHOT, (req, rsp, h) -> {
+      req.getParams().required().check(COLLECTION_PROP, CoreAdminParams.COMMIT_NAME);
+
+      String collectionName = req.getParams().get(COLLECTION_PROP);
+      ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
+      if (!clusterState.hasCollection(collectionName)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+      }
+
+      Map<String, Object> params = req.getParams().getAll(null, COLLECTION_PROP, CoreAdminParams.COMMIT_NAME);
+      return params;
+    }),
+    LISTSNAPSHOTS_OP(LISTSNAPSHOTS, (req, rsp, h) -> {
+      req.getParams().required().check(COLLECTION_PROP);
+
+      String collectionName = req.getParams().get(COLLECTION_PROP);
+      ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
+      if (!clusterState.hasCollection(collectionName)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+      }
 
+      NamedList<Object> snapshots = new NamedList<Object>();
+      SolrZkClient client = h.coreContainer.getZkController().getZkClient();
+      Collection<CollectionSnapshotMetaData> m = SolrSnapshotManager.listSnapshots(client, collectionName);
+      for (CollectionSnapshotMetaData meta : m) {
+        snapshots.add(meta.getName(), meta.toNamedList());
+      }
+
+      rsp.add(SolrSnapshotManager.SNAPSHOTS_INFO, snapshots);
+      return null;
+    }),
     REPLACENODE_OP(REPLACENODE, (req, rsp, h) -> req.getParams().required().getAll(req.getParams().getAll(null, "parallel"), "source", "target")),
     DELETENODE_OP(DELETENODE, (req, rsp, h) -> req.getParams().required().getAll(null, "node"));
     public final CollectionOp fun;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index 0b17d9e..5836ed3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -37,6 +37,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
 import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminOp;
@@ -270,12 +271,12 @@ enum CoreAdminOperation implements CoreAdminOp {
         Optional<SnapshotMetaData> metadata = mgr.getSnapshotMetaData(name);
         if ( metadata.isPresent() ) {
           NamedList<String> props = new NamedList<>();
-          props.add("generation", String.valueOf(metadata.get().getGenerationNumber()));
-          props.add("indexDirPath", metadata.get().getIndexDirPath());
+          props.add(SolrSnapshotManager.GENERATION_NUM, String.valueOf(metadata.get().getGenerationNumber()));
+          props.add(SolrSnapshotManager.INDEX_DIR_PATH, metadata.get().getIndexDirPath());
           result.add(name, props);
         }
       }
-      it.rsp.add("snapshots", result);
+      it.rsp.add(SolrSnapshotManager.SNAPSHOTS_INFO, result);
     }
   });
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/handler/admin/CreateSnapshotOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CreateSnapshotOp.java b/solr/core/src/java/org/apache/solr/handler/admin/CreateSnapshotOp.java
index 81f56c4..63b052b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CreateSnapshotOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CreateSnapshotOp.java
@@ -23,6 +23,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.util.RefCounted;
@@ -53,10 +54,11 @@ class CreateSnapshotOp implements CoreAdminHandler.CoreAdminOp {
       SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
       mgr.snapshot(commitName, indexDirPath, ic.getGeneration());
 
-      it.rsp.add("core", core.getName());
-      it.rsp.add("commitName", commitName);
-      it.rsp.add("indexDirPath", indexDirPath);
-      it.rsp.add("generation", ic.getGeneration());
+      it.rsp.add(CoreAdminParams.CORE, core.getName());
+      it.rsp.add(CoreAdminParams.COMMIT_NAME, commitName);
+      it.rsp.add(SolrSnapshotManager.INDEX_DIR_PATH, indexDirPath);
+      it.rsp.add(SolrSnapshotManager.GENERATION_NUM, ic.getGeneration());
+      it.rsp.add(SolrSnapshotManager.FILE_LIST, ic.getFileNames());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/java/org/apache/solr/handler/admin/DeleteSnapshotOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/DeleteSnapshotOp.java b/solr/core/src/java/org/apache/solr/handler/admin/DeleteSnapshotOp.java
index 739837c..ee77282 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/DeleteSnapshotOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/DeleteSnapshotOp.java
@@ -40,6 +40,10 @@ class DeleteSnapshotOp implements CoreAdminHandler.CoreAdminOp {
 
     try {
       core.deleteNamedSnapshot(commitName);
+      // Ideally we shouldn't need this. This is added since the RPC logic in
+      // OverseerCollectionMessageHandler can not provide the coreName as part of the result.
+      it.rsp.add(CoreAdminParams.CORE, core.getName());
+      it.rsp.add(CoreAdminParams.COMMIT_NAME, commitName);
     } finally {
       core.close();
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
new file mode 100644
index 0000000..65f74ca
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core.snapshots;
+
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.ListSnapshots;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
+import org.apache.solr.handler.BackupRestoreUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SolrTestCaseJ4.SuppressSSL // Currently unknown why SSL does not work with this test
+@Slow
+public class TestSolrCloudSnapshots extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static long docsSeed; // see indexDocs()
+  private static final int NUM_SHARDS = 2;
+  private static final int NUM_REPLICAS = 2;
+  private static final int NUM_NODES = NUM_REPLICAS * NUM_SHARDS;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    useFactory("solr.StandardDirectoryFactory");
+    configureCluster(NUM_NODES)// nodes
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+
+    docsSeed = random().nextLong();
+  }
+
+  @AfterClass
+  public static void teardownClass() throws Exception {
+    System.clearProperty("test.build.data");
+    System.clearProperty("test.cache.data");
+  }
+
+  @Test
+  public void testSnapshots() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String collectionName = "SolrCloudSnapshots";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", NUM_SHARDS, NUM_REPLICAS);
+    create.process(solrClient);
+
+    int nDocs = BackupRestoreUtils.indexDocs(cluster.getSolrClient(), collectionName, docsSeed);
+    BackupRestoreUtils.verifyDocs(nDocs, solrClient, collectionName);
+
+    String commitName = TestUtil.randomSimpleString(random(), 1, 5);
+
+    // Verify if snapshot creation works with replica failures.
+    boolean replicaFailures = usually();
+    Optional<String> stoppedCoreName = Optional.empty();
+    if (replicaFailures) {
+      // Here the assumption is that Solr will spread the replicas uniformly across nodes.
+      // If this is not true for some reason, then we will need to add some logic to find a
+      // node with a single replica.
+      this.cluster.getRandomJetty(random()).stop();
+
+      // Sleep a bit for allowing ZK watch to fire.
+      Thread.sleep(5000);
+
+      // Figure out if at-least one replica is "down".
+      DocCollection collState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+      for (Slice s : collState.getSlices()) {
+        for (Replica replica : s.getReplicas()) {
+          if (replica.getState() == State.DOWN) {
+            stoppedCoreName = Optional.of(replica.getCoreName());
+          }
+        }
+      }
+    }
+
+    int expectedCoresWithSnapshot = stoppedCoreName.isPresent() ? (NUM_SHARDS * NUM_REPLICAS) - 1 : (NUM_SHARDS * NUM_REPLICAS);
+
+    CollectionAdminRequest.CreateSnapshot createSnap = new CollectionAdminRequest.CreateSnapshot(collectionName, commitName);
+    createSnap.process(solrClient);
+
+    Collection<CollectionSnapshotMetaData> collectionSnaps = listCollectionSnapshots(solrClient, collectionName);
+    assertEquals(1, collectionSnaps.size());
+    CollectionSnapshotMetaData meta = collectionSnaps.iterator().next();
+    assertEquals(commitName, meta.getName());
+    assertEquals(CollectionSnapshotMetaData.SnapshotStatus.Successful, meta.getStatus());
+    assertEquals(expectedCoresWithSnapshot, meta.getReplicaSnapshots().size());
+    Map<String, CoreSnapshotMetaData> snapshotByCoreName = meta.getReplicaSnapshots().stream()
+        .collect(Collectors.toMap(CoreSnapshotMetaData::getCoreName, Function.identity()));
+
+    DocCollection collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+    assertEquals(2, collectionState.getActiveSlices().size());
+    for ( Slice shard : collectionState.getActiveSlices() ) {
+      assertEquals(2, shard.getReplicas().size());
+      for (Replica replica : shard.getReplicas()) {
+        if (stoppedCoreName.isPresent() && stoppedCoreName.get().equals(replica.getCoreName())) {
+          continue; // We know that the snapshot is not created for this replica.
+        }
+
+        String replicaBaseUrl = replica.getStr(BASE_URL_PROP);
+        String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+        assertTrue(snapshotByCoreName.containsKey(coreName));
+        CoreSnapshotMetaData coreSnapshot = snapshotByCoreName.get(coreName);
+
+        try (SolrClient adminClient = getHttpSolrClient(replicaBaseUrl)) {
+          Collection<SnapshotMetaData> snapshots = listCoreSnapshots(adminClient, coreName);
+          Optional<SnapshotMetaData> metaData = snapshots.stream().filter(x -> commitName.equals(x.getName())).findFirst();
+          assertTrue("Snapshot not created for core " + coreName, metaData.isPresent());
+          assertEquals(coreSnapshot.getIndexDirPath(), metaData.get().getIndexDirPath());
+          assertEquals(coreSnapshot.getGenerationNumber(), metaData.get().getGenerationNumber());
+        }
+      }
+    }
+
+    // Delete all documents.
+    {
+      solrClient.deleteByQuery(collectionName, "*:*");
+      solrClient.commit(collectionName);
+      BackupRestoreUtils.verifyDocs(0, solrClient, collectionName);
+    }
+
+    String backupLocation = createTempDir().toFile().getAbsolutePath();
+    String backupName = "mytestbackup";
+    String restoreCollectionName = collectionName + "_restored";
+
+    //Create a backup using the earlier created snapshot.
+    {
+      CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(collectionName, backupName)
+          .setLocation(backupLocation).setCommitName(commitName);
+      if (random().nextBoolean()) {
+        assertEquals(0, backup.process(solrClient).getStatus());
+      } else {
+        assertEquals(RequestStatusState.COMPLETED, backup.processAndWait(solrClient, 30));//async
+      }
+    }
+
+    // Restore backup.
+    {
+      CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+          .setLocation(backupLocation);
+      if (random().nextBoolean()) {
+        assertEquals(0, restore.process(solrClient).getStatus());
+      } else {
+        assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(solrClient, 30));//async
+      }
+      AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+          restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
+      BackupRestoreUtils.verifyDocs(nDocs, solrClient, restoreCollectionName);
+    }
+
+    // Verify if the snapshot deletion works correctly when one or more replicas containing the snapshot are
+    // deleted
+    boolean replicaDeletion = rarely();
+    if (replicaDeletion) {
+      CoreSnapshotMetaData replicaToDelete = null;
+      for (String shardId : meta.getShards()) {
+        List<CoreSnapshotMetaData> replicas = meta.getReplicaSnapshotsForShard(shardId);
+        if (replicas.size() > 1) {
+          int r_index = random().nextInt(replicas.size());
+          replicaToDelete = replicas.get(r_index);
+        }
+      }
+
+      if (replicaToDelete != null) {
+        collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+        for (Slice s : collectionState.getSlices()) {
+          for (Replica r : s.getReplicas()) {
+            if (r.getCoreName().equals(replicaToDelete.getCoreName())) {
+              log.info("Deleting replica {}", r);
+              CollectionAdminRequest.DeleteReplica delReplica = CollectionAdminRequest.deleteReplica(collectionName,
+                  replicaToDelete.getShardId(), r.getName());
+              delReplica.process(solrClient);
+              // The replica deletion will cleanup the snapshot meta-data.
+              snapshotByCoreName.remove(r.getCoreName());
+              break;
+            }
+          }
+        }
+      }
+    }
+
+    // Delete snapshot
+    CollectionAdminRequest.DeleteSnapshot deleteSnap = new CollectionAdminRequest.DeleteSnapshot(collectionName, commitName);
+    deleteSnap.process(solrClient);
+
+    // Wait for a while so that the clusterstate.json updates are propagated to the client side.
+    Thread.sleep(2000);
+    collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+
+    for ( Slice shard : collectionState.getActiveSlices() ) {
+      for (Replica replica : shard.getReplicas()) {
+        if (stoppedCoreName.isPresent() && stoppedCoreName.get().equals(replica.getCoreName())) {
+          continue; // We know that the snapshot was not created for this replica.
+        }
+
+        String replicaBaseUrl = replica.getStr(BASE_URL_PROP);
+        String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+        try (SolrClient adminClient = getHttpSolrClient(replicaBaseUrl)) {
+          Collection<SnapshotMetaData> snapshots = listCoreSnapshots(adminClient, coreName);
+          Optional<SnapshotMetaData> metaData = snapshots.stream().filter(x -> commitName.equals(x.getName())).findFirst();
+          assertFalse("Snapshot not deleted for core " + coreName, metaData.isPresent());
+          // Remove the entry for core if the snapshot is deleted successfully.
+          snapshotByCoreName.remove(coreName);
+        }
+      }
+    }
+
+    // Verify all core-level snapshots are deleted.
+    assertTrue("The cores remaining " + snapshotByCoreName, snapshotByCoreName.isEmpty());
+    assertTrue(listCollectionSnapshots(solrClient, collectionName).isEmpty());
+  }
+
+  private Collection<CollectionSnapshotMetaData> listCollectionSnapshots(SolrClient adminClient, String collectionName) throws Exception {
+    CollectionAdminRequest.ListSnapshots listSnapshots = new CollectionAdminRequest.ListSnapshots(collectionName);
+    CollectionAdminResponse resp = listSnapshots.process(adminClient);
+
+    assertTrue( resp.getResponse().get(SolrSnapshotManager.SNAPSHOTS_INFO) instanceof NamedList );
+    NamedList apiResult = (NamedList) resp.getResponse().get(SolrSnapshotManager.SNAPSHOTS_INFO);
+
+    Collection<CollectionSnapshotMetaData> result = new ArrayList<>();
+    for (int i = 0; i < apiResult.size(); i++) {
+      result.add(new CollectionSnapshotMetaData((NamedList<Object>)apiResult.getVal(i)));
+    }
+
+    return result;
+  }
+
+  private Collection<SnapshotMetaData> listCoreSnapshots(SolrClient adminClient, String coreName) throws Exception {
+    ListSnapshots req = new ListSnapshots();
+    req.setCoreName(coreName);
+    NamedList resp = adminClient.request(req);
+    assertTrue( resp.get(SolrSnapshotManager.SNAPSHOTS_INFO) instanceof NamedList );
+    NamedList apiResult = (NamedList) resp.get(SolrSnapshotManager.SNAPSHOTS_INFO);
+
+    List<SnapshotMetaData> result = new ArrayList<>(apiResult.size());
+    for(int i = 0 ; i < apiResult.size(); i++) {
+      String commitName = apiResult.getName(i);
+      String indexDirPath = (String)((NamedList)apiResult.get(commitName)).get(SolrSnapshotManager.INDEX_DIR_PATH);
+      long genNumber = Long.valueOf((String)((NamedList)apiResult.get(commitName)).get(SolrSnapshotManager.GENERATION_NUM));
+      result.add(new SnapshotMetaData(commitName, indexDirPath, genNumber));
+    }
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index c1f8261..72406ef 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -706,8 +706,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   // BACKUP request
   public static class Backup extends AsyncCollectionSpecificAdminRequest {
     protected final String name;
-    protected Optional<String> repositoryName;
+    protected Optional<String> repositoryName = Optional.empty();
     protected String location;
+    protected Optional<String> commitName = Optional.empty();
 
     public Backup(String collection, String name) {
       super(CollectionAction.BACKUP, collection);
@@ -746,6 +747,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       return this;
     }
 
+    public Optional<String> getCommitName() {
+      return commitName;
+    }
+
+    public Backup setCommitName(String commitName) {
+      this.commitName = Optional.ofNullable(commitName);
+      return this;
+    }
+
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
@@ -755,6 +765,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (repositoryName.isPresent()) {
         params.set(CoreAdminParams.BACKUP_REPOSITORY, repositoryName.get());
       }
+      if (commitName.isPresent()) {
+        params.set(CoreAdminParams.COMMIT_NAME, commitName.get());
+      }
       return params;
     }
 
@@ -767,7 +780,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   // RESTORE request
   public static class Restore extends AsyncCollectionSpecificAdminRequest {
     protected final String backupName;
-    protected Optional<String> repositoryName;
+    protected Optional<String> repositoryName = Optional.empty();
     protected String location;
 
     // in common with collection creation:
@@ -860,6 +873,105 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
 
   }
 
+  //Note : This method is added since solrj module does not use Google
+  // guava library. Also changes committed for SOLR-8765 result in wrong
+  // error message when "collection" parameter is specified as Null.
+  // This is because the setCollectionName method is deprecated.
+  static <T> T checkNotNull(String param, T value) {
+    if (value == null) {
+      throw new NullPointerException("Please specify a value for parameter " + param);
+    }
+    return value;
+  }
+
+  @SuppressWarnings("serial")
+  public static class CreateSnapshot extends AsyncCollectionSpecificAdminRequest {
+    protected final String commitName;
+
+    public CreateSnapshot(String collection, String commitName) {
+      super(CollectionAction.CREATESNAPSHOT, checkNotNull(CoreAdminParams.COLLECTION ,collection));
+      this.commitName = checkNotNull(CoreAdminParams.COMMIT_NAME, commitName);
+    }
+
+    public String getCollectionName() {
+      return collection;
+    }
+
+    public String getCommitName() {
+      return commitName;
+    }
+
+    @Override
+    public AsyncCollectionSpecificAdminRequest setCollectionName (String collection) {
+      this.collection = checkNotNull(CoreAdminParams.COLLECTION ,collection);
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.set(CoreAdminParams.COLLECTION, collection);
+      params.set(CoreAdminParams.COMMIT_NAME, commitName);
+      return params;
+    }
+  }
+
+  @SuppressWarnings("serial")
+  public static class DeleteSnapshot extends AsyncCollectionSpecificAdminRequest {
+    protected final String commitName;
+
+    public DeleteSnapshot (String collection, String commitName) {
+      super(CollectionAction.DELETESNAPSHOT, checkNotNull(CoreAdminParams.COLLECTION ,collection));
+      this.commitName = checkNotNull(CoreAdminParams.COMMIT_NAME, commitName);
+    }
+
+    public String getCollectionName() {
+      return collection;
+    }
+
+    public String getCommitName() {
+      return commitName;
+    }
+
+    @Override
+    public AsyncCollectionSpecificAdminRequest setCollectionName (String collection) {
+      this.collection = checkNotNull(CoreAdminParams.COLLECTION ,collection);
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.set(CoreAdminParams.COLLECTION, collection);
+      params.set(CoreAdminParams.COMMIT_NAME, commitName);
+      return params;
+    }
+  }
+
+  @SuppressWarnings("serial")
+  public static class ListSnapshots extends AsyncCollectionSpecificAdminRequest {
+    public ListSnapshots (String collection) {
+      super(CollectionAction.LISTSNAPSHOTS, checkNotNull(CoreAdminParams.COLLECTION ,collection));
+    }
+
+    public String getCollectionName() {
+      return collection;
+    }
+
+    @Override
+    public AsyncCollectionSpecificAdminRequest setCollectionName (String collection) {
+      this.collection = checkNotNull(CoreAdminParams.COLLECTION ,collection);
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.set(CoreAdminParams.COLLECTION, collection);
+      return params;
+    }
+  }
+
   /**
    * Returns a SolrRequest to create a new shard in a collection
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57ba9614/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index f10f089..f1e5a52 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -91,6 +91,9 @@ public interface CollectionParams {
     MIGRATESTATEFORMAT(true, LockLevel.CLUSTER),
     BACKUP(true, LockLevel.COLLECTION),
     RESTORE(true, LockLevel.COLLECTION),
+    CREATESNAPSHOT(true, LockLevel.COLLECTION),
+    DELETESNAPSHOT(true, LockLevel.COLLECTION),
+    LISTSNAPSHOTS(false, LockLevel.NONE),
     //only for testing. it just waits for specified time
     // these are not exposed via collection API commands
     // but the overseer is aware of these tasks