You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2016/07/28 18:23:21 UTC

lucene-solr:master: SOLR-9269: Snapshots (Solr core level)

Repository: lucene-solr
Updated Branches:
  refs/heads/master 7bf019a9c -> b7aa0b56b


SOLR-9269: Snapshots (Solr core level)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b7aa0b56
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b7aa0b56
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b7aa0b56

Branch: refs/heads/master
Commit: b7aa0b56bebcc22e63f0602a4ef15a728a799fc6
Parents: 7bf019a
Author: David Smiley <ds...@apache.org>
Authored: Thu Jul 28 14:23:09 2016 -0400
Committer: David Smiley <ds...@apache.org>
Committed: Thu Jul 28 14:23:09 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   5 +
 .../solr/core/IndexDeletionPolicyWrapper.java   |  27 +-
 .../src/java/org/apache/solr/core/SolrCore.java |  36 +-
 .../core/snapshots/SolrSnapshotManager.java     | 134 ++++++
 .../snapshots/SolrSnapshotMetaDataManager.java  | 416 ++++++++++++++++++
 .../solr/core/snapshots/package-info.java       |  22 +
 .../org/apache/solr/handler/IndexFetcher.java   |  16 +-
 .../apache/solr/handler/ReplicationHandler.java |  24 +-
 .../org/apache/solr/handler/RestoreCore.java    |  18 +-
 .../org/apache/solr/handler/SnapShooter.java    |  39 +-
 .../solr/handler/admin/CoreAdminOperation.java  | 125 +++++-
 .../core/snapshots/TestSolrCoreSnapshots.java   | 419 +++++++++++++++++++
 .../apache/solr/handler/BackupRestoreUtils.java |  37 ++
 .../solr/handler/TestHdfsBackupRestoreCore.java |  46 +-
 .../solr/handler/TestReplicationHandler.java    |  20 +-
 .../client/solrj/request/CoreAdminRequest.java  |  57 +++
 .../solr/common/params/CoreAdminParams.java     |  10 +-
 17 files changed, 1365 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6707e1a..3011cec 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -103,6 +103,11 @@ New Features
 * SOLR-9200: Add Delegation Token Support to Solr.
   (Gregory Chanan)
 
+* SOLR-9038: Solr core snapshots: The current commit can be snapshotted which retains the commit and associates it with
+  a name.  The core admin API can create snapshots, list them, and delete them. Snapshot names can be referenced in
+  doing a core backup, and in replication.  Snapshot metadata is stored in a new snapshot_metadata/ dir.
+  (Hrishikesh Gadre via David Smiley)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java b/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
index 207c0e5..3616d4e 100644
--- a/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
+++ b/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
@@ -15,21 +15,26 @@
  * limitations under the License.
  */
 package org.apache.solr.core;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.store.Directory;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.update.SolrIndexWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * A wrapper for an IndexDeletionPolicy instance.
  * <p>
@@ -52,9 +57,11 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
   private final Map<Long, Long> reserves = new ConcurrentHashMap<>();
   private volatile IndexCommit latestCommit;
   private final ConcurrentHashMap<Long, AtomicInteger> savedCommits = new ConcurrentHashMap<>();
+  private final SolrSnapshotMetaDataManager snapshotMgr;
 
-  public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
+  public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy, SolrSnapshotMetaDataManager snapshotMgr) {
     this.deletionPolicy = deletionPolicy;
+    this.snapshotMgr = snapshotMgr;
   }
 
   /**
@@ -134,7 +141,6 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
     }
   }
 
-
   /**
    * Internal use for Lucene... do not explicitly call.
    */
@@ -185,7 +191,8 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
       Long gen = delegate.getGeneration();
       Long reserve = reserves.get(gen);
       if (reserve != null && System.nanoTime() < reserve) return;
-      if(savedCommits.containsKey(gen)) return;
+      if (savedCommits.containsKey(gen)) return;
+      if (snapshotMgr.isSnapshotted(gen)) return;
       delegate.delete();
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index faef1c8..2704e4a 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -81,6 +81,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.DirectoryFactory.DirContext;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.handler.IndexFetcher;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.handler.RequestHandlerBase;
@@ -184,6 +185,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
   private final Map<String,UpdateRequestProcessorChain> updateProcessorChains;
   private final Map<String, SolrInfoMBean> infoRegistry;
   private final IndexDeletionPolicyWrapper solrDelPolicy;
+  private final SolrSnapshotMetaDataManager snapshotMgr;
   private final DirectoryFactory directoryFactory;
   private IndexReaderFactory indexReaderFactory;
   private final Codec codec;
@@ -414,7 +416,19 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
     } else {
       delPolicy = new SolrDeletionPolicy();
     }
-    return new IndexDeletionPolicyWrapper(delPolicy);
+
+    return new IndexDeletionPolicyWrapper(delPolicy, snapshotMgr);
+  }
+
+  private SolrSnapshotMetaDataManager initSnapshotMetaDataManager() {
+    try {
+      String dirName = getDataDir() + SolrSnapshotMetaDataManager.SNAPSHOT_METADATA_DIR + "/";
+      Directory snapshotDir = directoryFactory.get(dirName, DirContext.DEFAULT,
+           getSolrConfig().indexConfig.lockType);
+      return new SolrSnapshotMetaDataManager(this, snapshotDir);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
   private void initListeners() {
@@ -739,6 +753,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
 
       initListeners();
 
+      this.snapshotMgr = initSnapshotMetaDataManager();
       this.solrDelPolicy = initDeletionPolicy(delPolicy);
 
       this.codec = initCodec(solrConfig, this.schema);
@@ -1242,6 +1257,17 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
       }
     }
 
+    // Close the snapshots meta-data directory.
+    Directory snapshotsDir = snapshotMgr.getSnapshotsDir();
+    try {
+      this.directoryFactory.release(snapshotsDir);
+    }  catch (Throwable e) {
+      SolrException.log(log,e);
+      if (e instanceof Error) {
+        throw (Error) e;
+      }
+    }
+
     if (coreStateClosed) {
 
       try {
@@ -2343,6 +2369,14 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
     return solrDelPolicy;
   }
 
+  /**
+   * @return A reference of {@linkplain SolrSnapshotMetaDataManager}
+   *         managing the persistent snapshots for this Solr core.
+   */
+  public SolrSnapshotMetaDataManager getSnapshotMetaDataManager() {
+    return snapshotMgr;
+  }
+
   public ReentrantLock getRuleExpiryLock() {
     return ruleExpiryLock;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
new file mode 100644
index 0000000..95df3ff
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotManager.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core.snapshots;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides functionality required to handle the data files corresponding to Solr snapshots.
+ */
+public class SolrSnapshotManager {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * This method deletes index files of the {@linkplain IndexCommit} for the specified generation number.
+   *
+   * @param dir The index directory storing the snapshot.
+   * @param gen The generation number for the {@linkplain IndexCommit}
+   * @throws IOException in case of I/O errors.
+   */
+  public static void deleteIndexFiles ( Directory dir, Collection<SnapshotMetaData> snapshots, long gen ) throws IOException {
+    List<IndexCommit> commits = DirectoryReader.listCommits(dir);
+    Map<String, Integer> refCounts = buildRefCounts(snapshots, commits);
+    for (IndexCommit ic : commits) {
+      if (ic.getGeneration() == gen) {
+        deleteIndexFiles(dir,refCounts, ic);
+        break;
+      }
+    }
+  }
+
+  /**
+   * This method deletes all files not corresponding to a configured snapshot in the specified index directory.
+   *
+   * @param dir The index directory to search for.
+   * @throws IOException in case of I/O errors.
+   */
+  public static void deleteNonSnapshotIndexFiles (Directory dir, Collection<SnapshotMetaData> snapshots) throws IOException {
+    List<IndexCommit> commits = DirectoryReader.listCommits(dir);
+    Map<String, Integer> refCounts = buildRefCounts(snapshots, commits);
+    Set<Long> snapshotGenNumbers = snapshots.stream()
+                                            .map(SnapshotMetaData::getGenerationNumber)
+                                            .collect(Collectors.toSet());
+    for (IndexCommit ic : commits) {
+      if (!snapshotGenNumbers.contains(ic.getGeneration())) {
+        deleteIndexFiles(dir,refCounts, ic);
+      }
+    }
+  }
+
+  /**
+   * This method computes reference count for the index files by taking into consideration
+   * (a) configured snapshots and (b) files sharing between two or more {@linkplain IndexCommit} instances.
+   *
+   * @param snapshots A collection of user configured snapshots
+   * @param commits A list of {@linkplain IndexCommit} instances
+   * @return A map containing reference count for each index file referred in one of the {@linkplain IndexCommit} instances.
+   * @throws IOException in case of I/O error.
+   */
+  @VisibleForTesting
+  static Map<String, Integer> buildRefCounts (Collection<SnapshotMetaData> snapshots, List<IndexCommit> commits) throws IOException {
+    Map<String, Integer> result = new HashMap<>();
+    Map<Long, IndexCommit> commitsByGen = commits.stream().collect(
+        Collectors.toMap(IndexCommit::getGeneration, Function.identity()));
+
+    for(SnapshotMetaData md : snapshots) {
+      IndexCommit ic = commitsByGen.get(md.getGenerationNumber());
+      if (ic != null) {
+        Collection<String> fileNames = ic.getFileNames();
+        for(String fileName : fileNames) {
+          int refCount = result.getOrDefault(fileName, 0);
+          result.put(fileName, refCount+1);
+        }
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * This method deletes the index files associated with specified <code>indexCommit</code> provided they
+   * are not referred by some other {@linkplain IndexCommit}.
+   *
+   * @param dir The index directory containing the {@linkplain IndexCommit} to be deleted.
+   * @param refCounts A map containing reference counts for each file associated with every {@linkplain IndexCommit}
+   *                  in the specified directory.
+   * @param indexCommit The {@linkplain IndexCommit} whose files need to be deleted.
+   * @throws IOException in case of I/O errors.
+   */
+  private static void deleteIndexFiles ( Directory dir, Map<String, Integer> refCounts, IndexCommit indexCommit ) throws IOException {
+    log.info("Deleting index files for index commit with generation {} in directory {}", indexCommit.getGeneration(), dir);
+    for (String fileName : indexCommit.getFileNames()) {
+      try {
+        // Ensure that a file being deleted is not referred by some other commit.
+        int ref = refCounts.getOrDefault(fileName, 0);
+        log.debug("Reference count for file {} is {}", fileName, ref);
+        if (ref == 0) {
+          dir.deleteFile(fileName);
+        }
+      } catch (IOException e) {
+        log.warn("Unable to delete file {} in directory {} due to exception {}", fileName, dir, e.getMessage());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java
new file mode 100644
index 0000000..26cbe21
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/snapshots/SolrSnapshotMetaDataManager.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core.snapshots;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.DirectoryFactory.DirContext;
+import org.apache.solr.core.IndexDeletionPolicyWrapper;
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible to manage the persistent snapshots meta-data for the Solr indexes. The
+ * persistent snapshots are implemented by relying on Lucene {@linkplain IndexDeletionPolicy}
+ * abstraction to configure a specific {@linkplain IndexCommit} to be retained. The
+ * {@linkplain IndexDeletionPolicyWrapper} in Solr uses this class to create/delete the Solr index
+ * snapshots.
+ */
+public class SolrSnapshotMetaDataManager {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String SNAPSHOT_METADATA_DIR = "snapshot_metadata";
+
+  /**
+   * A class defining the meta-data for a specific snapshot.
+   */
+  public static class SnapshotMetaData {
+    private String name;
+    private String indexDirPath;
+    private long generationNumber;
+
+    public SnapshotMetaData(String name, String indexDirPath, long generationNumber) {
+      super();
+      this.name = name;
+      this.indexDirPath = indexDirPath;
+      this.generationNumber = generationNumber;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getIndexDirPath() {
+      return indexDirPath;
+    }
+
+    public long getGenerationNumber() {
+      return generationNumber;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("SnapshotMetaData[name=");
+      builder.append(name);
+      builder.append(", indexDirPath=");
+      builder.append(indexDirPath);
+      builder.append(", generation=");
+      builder.append(generationNumber);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /** Prefix used for the save file. */
+  public static final String SNAPSHOTS_PREFIX = "snapshots_";
+  private static final int VERSION_START = 0;
+  private static final int VERSION_CURRENT = VERSION_START;
+  private static final String CODEC_NAME = "solr-snapshots";
+
+  // The index writer which maintains the snapshots metadata
+  private long nextWriteGen;
+
+  private final Directory dir;
+
+  /** Used to map snapshot name to snapshot meta-data. */
+  protected final Map<String,SnapshotMetaData> nameToDetailsMapping = new LinkedHashMap<>();
+  /** Used to figure out the *current* index data directory path */
+  private final SolrCore solrCore;
+
+  /**
+   * A constructor.
+   *
+   * @param dir The directory where the snapshot meta-data should be stored. Enables updating
+   *            the existing meta-data.
+   * @throws IOException in case of errors.
+   */
+  public SolrSnapshotMetaDataManager(SolrCore solrCore, Directory dir) throws IOException {
+    this(solrCore, dir, OpenMode.CREATE_OR_APPEND);
+  }
+
+  /**
+   * A constructor.
+   *
+   * @param dir The directory where the snapshot meta-data is stored.
+   * @param mode CREATE If previous meta-data should be erased.
+   *             APPEND If previous meta-data should be read and updated.
+   *             CREATE_OR_APPEND Creates a new meta-data structure if one does not exist
+   *                              Updates the existing structure if one exists.
+   * @throws IOException in case of errors.
+   */
+  public SolrSnapshotMetaDataManager(SolrCore solrCore, Directory dir, OpenMode mode) throws IOException {
+    this.solrCore = solrCore;
+    this.dir = dir;
+
+    if (mode == OpenMode.CREATE) {
+      deleteSnapshotMetadataFiles();
+    }
+
+    loadFromSnapshotMetadataFile();
+
+    if (mode == OpenMode.APPEND && nextWriteGen == 0) {
+      throw new IllegalStateException("no snapshots stored in this directory");
+    }
+  }
+
+  /**
+   * @return The snapshot meta-data directory
+   */
+  public Directory getSnapshotsDir() {
+    return dir;
+  }
+
+  /**
+   * This method creates a new snapshot meta-data entry.
+   *
+   * @param name The name of the snapshot.
+   * @param indexDirPath The directory path where the index files are stored.
+   * @param gen The generation number for the {@linkplain IndexCommit} being snapshotted.
+   * @throws IOException in case of I/O errors.
+   */
+  public synchronized void snapshot(String name, String indexDirPath, long gen) throws IOException {
+    Preconditions.checkNotNull(name);
+
+    log.info("Creating the snapshot named {} for core {} associated with index commit with generation {} in directory {}"
+        , name, solrCore.getName(), gen, indexDirPath);
+
+    if(nameToDetailsMapping.containsKey(name)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "A snapshot with name " + name + " already exists");
+    }
+
+    SnapshotMetaData d = new SnapshotMetaData(name, indexDirPath, gen);
+    nameToDetailsMapping.put(name, d);
+
+    boolean success = false;
+    try {
+      persist();
+      success = true;
+    } finally {
+      if (!success) {
+        try {
+          release(name);
+        } catch (Exception e) {
+          // Suppress so we keep throwing original exception
+        }
+      }
+    }
+  }
+
+  /**
+   * This method deletes a previously created snapshot (if any).
+   *
+   * @param name The name of the snapshot to be deleted.
+   * @return The snapshot meta-data if the snapshot with the snapshot name exists.
+   * @throws IOException in case of I/O error
+   */
+  public synchronized Optional<SnapshotMetaData> release(String name) throws IOException {
+    log.info("Deleting the snapshot named {} for core {}", name, solrCore.getName());
+    SnapshotMetaData result = nameToDetailsMapping.remove(Preconditions.checkNotNull(name));
+    if(result != null) {
+      boolean success = false;
+      try {
+        persist();
+        success = true;
+      } finally {
+        if (!success) {
+          nameToDetailsMapping.put(name, result);
+        }
+      }
+    }
+    return Optional.ofNullable(result);
+  }
+
+  /**
+   * This method returns if snapshot is created for the specified generation number in
+   * the *current* index directory.
+   *
+   * @param genNumber The generation number for the {@linkplain IndexCommit} to be checked.
+   * @return true if the snapshot is created.
+   *         false otherwise.
+   */
+  public synchronized boolean isSnapshotted(long genNumber) {
+    return !nameToDetailsMapping.isEmpty() && isSnapshotted(solrCore.getIndexDir(), genNumber);
+  }
+
+  /**
+   * This method returns if snapshot is created for the specified generation number in
+   * the specified index directory.
+   *
+   * @param genNumber The generation number for the {@linkplain IndexCommit} to be checked.
+   * @return true if the snapshot is created.
+   *         false otherwise.
+   */
+  public synchronized boolean isSnapshotted(String indexDirPath, long genNumber) {
+    return !nameToDetailsMapping.isEmpty()
+        && nameToDetailsMapping.values().stream()
+           .anyMatch(entry -> entry.getIndexDirPath().equals(indexDirPath) && entry.getGenerationNumber() == genNumber);
+  }
+
+  /**
+   * This method returns the snapshot meta-data for the specified name (if it exists).
+   *
+   * @param name The name of the snapshot
+   * @return The snapshot meta-data if exists.
+   */
+  public synchronized Optional<SnapshotMetaData> getSnapshotMetaData(String name) {
+    return Optional.ofNullable(nameToDetailsMapping.get(name));
+  }
+
+  /**
+   * @return A list of snapshots created so far.
+   */
+  public synchronized List<String> listSnapshots() {
+    // We create a copy for thread safety.
+    return new ArrayList<>(nameToDetailsMapping.keySet());
+  }
+
+  /**
+   * This method returns a list of snapshots created in a specified index directory.
+   *
+   * @param indexDirPath The index directory path.
+   * @return a list snapshots stored in the specified directory.
+   */
+  public synchronized Collection<SnapshotMetaData> listSnapshotsInIndexDir(String indexDirPath) {
+    return nameToDetailsMapping.values().stream()
+        .filter(entry -> indexDirPath.equals(entry.getIndexDirPath()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * This method returns the {@linkplain IndexCommit} associated with the specified
+   * <code>commitName</code>. A snapshot with specified <code>commitName</code> must
+   * be created before invoking this method.
+   *
+   * @param commitName The name of persisted commit
+   * @return the {@linkplain IndexCommit}
+   * @throws IOException in case of I/O error.
+   */
+  public Optional<IndexCommit> getIndexCommitByName(String commitName) throws IOException {
+    Optional<IndexCommit> result = Optional.empty();
+    Optional<SnapshotMetaData> metaData = getSnapshotMetaData(commitName);
+    if (metaData.isPresent()) {
+      String indexDirPath = metaData.get().getIndexDirPath();
+      long gen = metaData.get().getGenerationNumber();
+
+      Directory d = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_NONE);
+      try {
+        result = DirectoryReader.listCommits(d)
+                                .stream()
+                                .filter(ic -> ic.getGeneration() == gen)
+                                .findAny();
+
+        if (!result.isPresent()) {
+          log.warn("Unable to find commit with generation {} in the directory {}", gen, indexDirPath);
+        }
+
+      } finally {
+        solrCore.getDirectoryFactory().release(d);
+      }
+    } else {
+      log.warn("Commit with name {} is not persisted for core {}", commitName, solrCore.getName());
+    }
+
+    return result;
+  }
+
+  private synchronized void persist() throws IOException {
+    String fileName = SNAPSHOTS_PREFIX + nextWriteGen;
+    IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT);
+    boolean success = false;
+    try {
+      CodecUtil.writeHeader(out, CODEC_NAME, VERSION_CURRENT);
+      out.writeVInt(nameToDetailsMapping.size());
+      for(Entry<String,SnapshotMetaData> ent : nameToDetailsMapping.entrySet()) {
+        out.writeString(ent.getKey());
+        out.writeString(ent.getValue().getIndexDirPath());
+        out.writeVLong(ent.getValue().getGenerationNumber());
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(out);
+        IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
+      } else {
+        IOUtils.close(out);
+      }
+    }
+
+    dir.sync(Collections.singletonList(fileName));
+
+    if (nextWriteGen > 0) {
+      String lastSaveFile = SNAPSHOTS_PREFIX + (nextWriteGen-1);
+      // exception OK: likely it didn't exist
+      IOUtils.deleteFilesIgnoringExceptions(dir, lastSaveFile);
+    }
+
+    nextWriteGen++;
+  }
+
+  private synchronized void deleteSnapshotMetadataFiles() throws IOException {
+    for(String file : dir.listAll()) {
+      if (file.startsWith(SNAPSHOTS_PREFIX)) {
+        dir.deleteFile(file);
+      }
+    }
+  }
+
+  /**
+   * Reads the snapshot meta-data information from the given {@link Directory}.
+   */
+  private synchronized void loadFromSnapshotMetadataFile() throws IOException {
+    log.info("Loading from snapshot metadata file...");
+    long genLoaded = -1;
+    IOException ioe = null;
+    List<String> snapshotFiles = new ArrayList<>();
+    for(String file : dir.listAll()) {
+      if (file.startsWith(SNAPSHOTS_PREFIX)) {
+        long gen = Long.parseLong(file.substring(SNAPSHOTS_PREFIX.length()));
+        if (genLoaded == -1 || gen > genLoaded) {
+          snapshotFiles.add(file);
+          Map<String, SnapshotMetaData> snapshotMetaDataMapping = new HashMap<>();
+          IndexInput in = dir.openInput(file, IOContext.DEFAULT);
+          try {
+            CodecUtil.checkHeader(in, CODEC_NAME, VERSION_START, VERSION_START);
+            int count = in.readVInt();
+            for(int i=0;i<count;i++) {
+              String name = in.readString();
+              String indexDirPath = in.readString();
+              long commitGen = in.readVLong();
+              snapshotMetaDataMapping.put(name, new SnapshotMetaData(name, indexDirPath, commitGen));
+            }
+          } catch (IOException ioe2) {
+            // Save first exception & throw in the end
+            if (ioe == null) {
+              ioe = ioe2;
+            }
+          } finally {
+            in.close();
+          }
+
+          genLoaded = gen;
+          nameToDetailsMapping.clear();
+          nameToDetailsMapping.putAll(snapshotMetaDataMapping);
+        }
+      }
+    }
+
+    if (genLoaded == -1) {
+      // Nothing was loaded...
+      if (ioe != null) {
+        // ... not for lack of trying:
+        throw ioe;
+      }
+    } else {
+      if (snapshotFiles.size() > 1) {
+        // Remove any broken / old snapshot files:
+        String curFileName = SNAPSHOTS_PREFIX + genLoaded;
+        for(String file : snapshotFiles) {
+          if (!curFileName.equals(file)) {
+            IOUtils.deleteFilesIgnoringExceptions(dir, file);
+          }
+        }
+      }
+      nextWriteGen = 1+genLoaded;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/core/snapshots/package-info.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/snapshots/package-info.java b/solr/core/src/java/org/apache/solr/core/snapshots/package-info.java
new file mode 100644
index 0000000..3242cd3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/snapshots/package-info.java
@@ -0,0 +1,22 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+/**
+ * Core classes for Solr's persistent snapshots functionality
+ */
+package org.apache.solr.core.snapshots;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 714b800..77624c9 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -82,6 +82,9 @@ import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
 import org.apache.solr.handler.ReplicationHandler.*;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -468,9 +471,18 @@ public class IndexFetcher {
                 // let the system know we are changing dir's and the old one
                 // may be closed
                 if (indexDir != null) {
-                  LOG.info("removing old index directory " + indexDir);
                   solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
-                  solrCore.getDirectoryFactory().remove(indexDir);
+
+                  SolrSnapshotMetaDataManager snapshotsMgr = solrCore.getSnapshotMetaDataManager();
+                  Collection<SnapshotMetaData> snapshots = snapshotsMgr.listSnapshotsInIndexDir(indexDirPath);
+
+                  // Delete the old index directory only if no snapshot exists in that directory.
+                  if(snapshots.isEmpty()) {
+                    LOG.info("removing old index directory " + indexDir);
+                    solrCore.getDirectoryFactory().remove(indexDir);
+                  } else {
+                    SolrSnapshotManager.deleteNonSnapshotIndexFiles(indexDir, snapshots);
+                  }
                 }
               }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index ff93c42..0870e35 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -87,6 +87,7 @@ import org.apache.solr.core.SolrDeletionPolicy;
 import org.apache.solr.core.SolrEventListener;
 import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.backup.repository.LocalFileSystemRepository;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.search.SolrIndexSearcher;
@@ -512,11 +513,24 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
         numberToKeep = Integer.MAX_VALUE;
       }
 
-      IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
-      IndexCommit indexCommit = delPolicy.getLatestCommit();
+      IndexCommit indexCommit = null;
+      String commitName = params.get(CoreAdminParams.COMMIT_NAME);
+      if (commitName != null) {
+        SolrSnapshotMetaDataManager snapshotMgr = core.getSnapshotMetaDataManager();
+        Optional<IndexCommit> commit = snapshotMgr.getIndexCommitByName(commitName);
+        if(commit.isPresent()) {
+          indexCommit = commit.get();
+        } else {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an index commit with name " + commitName +
+              " for core " + core.getName());
+        }
+      } else {
+        IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
+        indexCommit = delPolicy.getLatestCommit();
 
-      if (indexCommit == null) {
-        indexCommit = req.getSearcher().getIndexReader().getIndexCommit();
+        if (indexCommit == null) {
+          indexCommit = req.getSearcher().getIndexReader().getIndexCommit();
+        }
       }
 
       String location = params.get(CoreAdminParams.BACKUP_LOCATION);
@@ -539,7 +553,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
       }
 
       // small race here before the commit point is saved
-      SnapShooter snapShooter = new SnapShooter(repo, core, location, params.get(NAME));
+      SnapShooter snapShooter = new SnapShooter(repo, core, location, params.get(NAME), commitName);
       snapShooter.validateCreateSnapshot();
       snapShooter.createSnapAsync(indexCommit, numberToKeep, (nl) -> snapShootDetails = nl);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
index d3c98fa..6aef35c 100644
--- a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
+++ b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
@@ -19,6 +19,7 @@ package org.apache.solr.handler;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.text.SimpleDateFormat;
+import java.util.Collection;
 import java.util.Date;
 import java.util.Locale;
 import java.util.concurrent.Callable;
@@ -32,6 +33,9 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +67,7 @@ public class RestoreCore implements Callable<Boolean> {
     String restoreIndexName = "restore." + dateFormat.format(new Date());
     String restoreIndexPath = core.getDataDir() + restoreIndexName;
 
+    String indexDirPath = core.getIndexDir();
     Directory restoreIndexDir = null;
     Directory indexDir = null;
     try {
@@ -71,7 +76,7 @@ public class RestoreCore implements Callable<Boolean> {
           DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
 
       //Prefer local copy.
-      indexDir = core.getDirectoryFactory().get(core.getIndexDir(),
+      indexDir = core.getDirectoryFactory().get(indexDirPath,
           DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
 
       //Move all files from backupDir to restoreIndexDir
@@ -130,7 +135,16 @@ public class RestoreCore implements Callable<Boolean> {
       }
       if (success) {
         core.getDirectoryFactory().doneWithDirectory(indexDir);
-        core.getDirectoryFactory().remove(indexDir);
+
+        SolrSnapshotMetaDataManager snapshotsMgr = core.getSnapshotMetaDataManager();
+        Collection<SnapshotMetaData> snapshots = snapshotsMgr.listSnapshotsInIndexDir(indexDirPath);
+
+        // Delete the old index directory only if no snapshot exists in that directory.
+        if (snapshots.isEmpty()) {
+          core.getDirectoryFactory().remove(indexDir);
+        } else {
+          SolrSnapshotManager.deleteNonSnapshotIndexFiles(indexDir, snapshots);
+        }
       }
 
       return true;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/SnapShooter.java b/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
index 5ac3243..e12649d 100644
--- a/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
+++ b/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
@@ -26,12 +26,14 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 import com.google.common.base.Preconditions;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
@@ -39,6 +41,7 @@ import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.backup.repository.BackupRepository.PathType;
 import org.apache.solr.core.backup.repository.LocalFileSystemRepository;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
@@ -59,6 +62,7 @@ public class SnapShooter {
   private URI baseSnapDirPath = null;
   private URI snapshotDirPath = null;
   private BackupRepository backupRepo = null;
+  private String commitName; // can be null
 
   @Deprecated
   public SnapShooter(SolrCore core, String location, String snapshotName) {
@@ -71,14 +75,14 @@ public class SnapShooter {
     } else {
       snapDirStr = core.getCoreDescriptor().getInstanceDir().resolve(location).normalize().toString();
     }
-    initialize(new LocalFileSystemRepository(), core, snapDirStr, snapshotName);
+    initialize(new LocalFileSystemRepository(), core, snapDirStr, snapshotName, null);
   }
 
-  public SnapShooter(BackupRepository backupRepo, SolrCore core, String location, String snapshotName) {
-    initialize(backupRepo, core, location, snapshotName);
+  public SnapShooter(BackupRepository backupRepo, SolrCore core, String location, String snapshotName, String commitName) {
+    initialize(backupRepo, core, location, snapshotName, commitName);
   }
 
-  private void initialize(BackupRepository backupRepo, SolrCore core, String location, String snapshotName) {
+  private void initialize(BackupRepository backupRepo, SolrCore core, String location, String snapshotName, String commitName) {
     this.solrCore = Preconditions.checkNotNull(core);
     this.backupRepo = Preconditions.checkNotNull(backupRepo);
     this.baseSnapDirPath = backupRepo.createURI(Preconditions.checkNotNull(location)).normalize();
@@ -90,6 +94,7 @@ public class SnapShooter {
       directoryName = "snapshot." + fmt.format(new Date());
     }
     this.snapshotDirPath = backupRepo.createURI(location, directoryName);
+    this.commitName = commitName;
   }
 
   public BackupRepository getBackupRepository() {
@@ -145,16 +150,26 @@ public class SnapShooter {
   }
 
   public NamedList createSnapshot() throws Exception {
-    IndexDeletionPolicyWrapper deletionPolicy = solrCore.getDeletionPolicy();
     RefCounted<SolrIndexSearcher> searcher = solrCore.getSearcher();
     try {
-      //TODO should we try solrCore.getDeletionPolicy().getLatestCommit() first?
-      IndexCommit indexCommit = searcher.get().getIndexReader().getIndexCommit();
-      deletionPolicy.saveCommitPoint(indexCommit.getGeneration());
-      try {
-        return createSnapshot(indexCommit);
-      } finally {
-        deletionPolicy.releaseCommitPoint(indexCommit.getGeneration());
+      if (commitName != null) {
+        SolrSnapshotMetaDataManager snapshotMgr = solrCore.getSnapshotMetaDataManager();
+        Optional<IndexCommit> commit = snapshotMgr.getIndexCommitByName(commitName);
+        if(commit.isPresent()) {
+          return createSnapshot(commit.get());
+        }
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to find an index commit with name " + commitName +
+            " for core " + solrCore.getName());
+      } else {
+        //TODO should we try solrCore.getDeletionPolicy().getLatestCommit() first?
+        IndexDeletionPolicyWrapper deletionPolicy = solrCore.getDeletionPolicy();
+        IndexCommit indexCommit = searcher.get().getIndexReader().getIndexCommit();
+        deletionPolicy.saveCommitPoint(indexCommit.getGeneration());
+        try {
+          return createSnapshot(indexCommit);
+        } finally {
+          deletionPolicy.releaseCommitPoint(indexCommit.getGeneration());
+        }
       }
     } finally {
       searcher.decref();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index 9b9aafa..e4103c5 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Future;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
@@ -59,9 +60,13 @@ import org.apache.solr.core.CachingDirectoryFactory;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
 import org.apache.solr.handler.RestoreCore;
 import org.apache.solr.handler.SnapShooter;
 import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminOp;
@@ -794,22 +799,26 @@ enum CoreAdminOperation implements CoreAdminOp {
           + " parameter or as a default repository property");
     }
 
-    try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
-      SnapShooter snapShooter = new SnapShooter(repository, core, location, name);
-      // validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
-      //  But we want to throw. One reason is that
-      //  this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
-      //  file system. Otherwise, perhaps the FS location isn't shared -- we want an error.
-      if (!snapShooter.getBackupRepository().exists(snapShooter.getLocation())) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,
-            "Directory to contain snapshots doesn't exist: " + snapShooter.getLocation());
+      // An optional parameter to describe the snapshot to be backed-up. If this
+      // parameter is not supplied, the latest index commit is backed-up.
+      String commitName = params.get(CoreAdminParams.COMMIT_NAME);
+
+      try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
+        SnapShooter snapShooter = new SnapShooter(repository, core, location, name, commitName);
+        // validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
+        //  But we want to throw. One reason is that
+        //  this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
+        //  file system. Otherwise, perhaps the FS location isn't shared -- we want an error.
+        if (!snapShooter.getBackupRepository().exists(snapShooter.getLocation())) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Directory to contain snapshots doesn't exist: " + snapShooter.getLocation());
+        }
+        snapShooter.validateCreateSnapshot();
+        snapShooter.createSnapshot();
+      } catch (Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Failed to backup core=" + cname + " because " + e, e);
       }
-      snapShooter.validateCreateSnapshot();
-      snapShooter.createSnapshot();
-    } catch (Exception e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-          "Failed to backup core=" + cname + " because " + e, e);
-    }
   }),
 
   RESTORECORE_OP(RESTORECORE, it -> {
@@ -845,6 +854,92 @@ enum CoreAdminOperation implements CoreAdminOp {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
       }
     }
+  }),
+  CREATESNAPSHOT_OP(CREATESNAPSHOT, it -> {
+    CoreContainer cc = it.handler.getCoreContainer();
+    final SolrParams params = it.req.getParams();
+
+    String commitName = params.required().get(CoreAdminParams.COMMIT_NAME);
+    String cname = params.required().get(CoreAdminParams.CORE);
+    try (SolrCore core = cc.getCore(cname)) {
+      if (core == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
+      }
+
+      String indexDirPath = core.getIndexDir();
+      IndexCommit ic = core.getDeletionPolicy().getLatestCommit();
+      if (ic == null) {
+        RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
+        try {
+          ic = searcher.get().getIndexReader().getIndexCommit();
+        } finally {
+          searcher.decref();
+        }
+      }
+      SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
+      mgr.snapshot(commitName, indexDirPath, ic.getGeneration());
+
+      it.rsp.add("core", core.getName());
+      it.rsp.add("commitName", commitName);
+      it.rsp.add("indexDirPath", indexDirPath);
+      it.rsp.add("generation", ic.getGeneration());
+    }
+  }),
+  DELETESNAPSHOT_OP(DELETESNAPSHOT, it -> {
+    CoreContainer cc = it.handler.getCoreContainer();
+    final SolrParams params = it.req.getParams();
+
+    String commitName = params.required().get(CoreAdminParams.COMMIT_NAME);
+    String cname = params.required().get(CoreAdminParams.CORE);
+    try (SolrCore core = cc.getCore(cname)) {
+      if (core == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
+      }
+
+      SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
+      Optional<SnapshotMetaData> metadata = mgr.release(commitName);
+      if (metadata.isPresent()) {
+        long gen = metadata.get().getGenerationNumber();
+        String indexDirPath = metadata.get().getIndexDirPath();
+
+        // If the directory storing the snapshot is not the same as the *current* core
+        // index directory, then delete the files corresponding to this snapshot.
+        // Otherwise we leave the index files related to snapshot as is (assuming the
+        // underlying Solr IndexDeletionPolicy will clean them up appropriately).
+        if (!indexDirPath.equals(core.getIndexDir())) {
+          Directory d = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_NONE);
+          try {
+            SolrSnapshotManager.deleteIndexFiles(d, mgr.listSnapshotsInIndexDir(indexDirPath), gen);
+          } finally {
+            core.getDirectoryFactory().release(d);
+          }
+        }
+      }
+    }
+  }),
+  LISTSNAPSHOTS_OP(LISTSNAPSHOTS, it -> {
+    CoreContainer cc = it.handler.getCoreContainer();
+    final SolrParams params = it.req.getParams();
+
+    String cname = params.required().get(CoreAdminParams.CORE);
+    try ( SolrCore core = cc.getCore(cname) ) {
+      if (core == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
+      }
+
+      SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
+      NamedList result = new NamedList();
+      for (String name : mgr.listSnapshots()) {
+        Optional<SnapshotMetaData> metadata = mgr.getSnapshotMetaData(name);
+        if ( metadata.isPresent() ) {
+          NamedList<String> props = new NamedList<>();
+          props.add("generation", String.valueOf(metadata.get().getGenerationNumber()));
+          props.add("indexDirPath", metadata.get().getIndexDirPath());
+          result.add(name, props);
+        }
+      }
+      it.rsp.add("snapshots", result);
+    }
   });
 
   final CoreAdminParams.CoreAdminAction action;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCoreSnapshots.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCoreSnapshots.java b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCoreSnapshots.java
new file mode 100644
index 0000000..aacac52
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCoreSnapshots.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core.snapshots;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexNotFoundException;
+import org.apache.lucene.store.SimpleFSDirectory;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.CreateSnapshot;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.DeleteSnapshot;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.ListSnapshots;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
+import org.apache.solr.handler.BackupRestoreUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+
+@SolrTestCaseJ4.SuppressSSL // Currently unknown why SSL does not work with this test
+@Slow
+public class TestSolrCoreSnapshots extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static long docsSeed; // see indexDocs()
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    useFactory("solr.StandardDirectoryFactory");
+    configureCluster(1)// nodes
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+
+    docsSeed = random().nextLong();
+  }
+
+  @AfterClass
+  public static void teardownClass() throws Exception {
+    System.clearProperty("test.build.data");
+    System.clearProperty("test.cache.data");
+  }
+
+  @Test
+  public void testBackupRestore() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String collectionName = "SolrCoreSnapshots";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
+    create.process(solrClient);
+
+    String location = createTempDir().toFile().getAbsolutePath();
+    int nDocs = BackupRestoreUtils.indexDocs(cluster.getSolrClient(), collectionName, docsSeed);
+
+    DocCollection collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+    assertEquals(1, collectionState.getActiveSlices().size());
+    Slice shard = collectionState.getActiveSlices().iterator().next();
+    assertEquals(1, shard.getReplicas().size());
+    Replica replica = shard.getReplicas().iterator().next();
+
+    String replicaBaseUrl = replica.getStr(BASE_URL_PROP);
+    String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+    String backupName = TestUtil.randomSimpleString(random(), 1, 5);
+    String commitName = TestUtil.randomSimpleString(random(), 1, 5);
+    String duplicateName = commitName.concat("_duplicate");
+
+    try (
+        SolrClient adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
+        SolrClient masterClient = getHttpSolrClient(replica.getCoreUrl())) {
+
+      SnapshotMetaData metaData = createSnapshot(adminClient, coreName, commitName);
+      // Create another snapshot referring to the same index commit to verify the
+      // reference counting implementation during snapshot deletion.
+      SnapshotMetaData duplicateCommit = createSnapshot(adminClient, coreName, duplicateName);
+
+      assertEquals (metaData.getIndexDirPath(), duplicateCommit.getIndexDirPath());
+      assertEquals (metaData.getGenerationNumber(), duplicateCommit.getGenerationNumber());
+
+      // Delete all documents
+      masterClient.deleteByQuery("*:*");
+      masterClient.commit();
+      BackupRestoreUtils.verifyDocs(0, cluster.getSolrClient(), collectionName);
+
+      // Verify that the index directory contains at least 2 index commits - one referred by the snapshots
+      // and the other containing document deletions.
+      {
+        List<IndexCommit> commits = listCommits(metaData.getIndexDirPath());
+        assertTrue(2 <= commits.size());
+      }
+
+      // Backup the earlier created snapshot.
+      {
+        Map<String,String> params = new HashMap<>();
+        params.put("name", backupName);
+        params.put("commitName", commitName);
+        params.put("location", location);
+        BackupRestoreUtils.runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.BACKUPCORE.toString(), params);
+      }
+
+      // Restore the backup
+      {
+        Map<String,String> params = new HashMap<>();
+        params.put("name", "snapshot." + backupName);
+        params.put("location", location);
+        BackupRestoreUtils.runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.RESTORECORE.toString(), params);
+        BackupRestoreUtils.verifyDocs(nDocs, cluster.getSolrClient(), collectionName);
+      }
+
+      // Verify that the old index directory (before restore) contains only those index commits referred by snapshots.
+      {
+        List<IndexCommit> commits = listCommits(metaData.getIndexDirPath());
+        assertEquals(1, commits.size());
+        assertEquals(metaData.getGenerationNumber(), commits.get(0).getGeneration());
+      }
+
+      // Delete first snapshot
+      deleteSnapshot(adminClient, coreName, commitName);
+
+      // Verify that corresponding index files have NOT been deleted (due to reference counting).
+      assertFalse(listCommits(metaData.getIndexDirPath()).isEmpty());
+
+      // Delete second snapshot
+      deleteSnapshot(adminClient, coreName, duplicateCommit.getName());
+
+      // Verify that corresponding index files have been deleted.
+      assertTrue(listCommits(duplicateCommit.getIndexDirPath()).isEmpty());
+    }
+  }
+
+  @Test
+  public void testHandlingSharedIndexFiles() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String collectionName = "SolrCoreSnapshots_IndexFileSharing";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
+    create.process(solrClient);
+
+    int nDocs = BackupRestoreUtils.indexDocs(cluster.getSolrClient(), collectionName, docsSeed);
+    DocCollection collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+    assertEquals(1, collectionState.getActiveSlices().size());
+    Slice shard = collectionState.getActiveSlices().iterator().next();
+    assertEquals(1, shard.getReplicas().size());
+    Replica replica = shard.getReplicas().iterator().next();
+
+    String replicaBaseUrl = replica.getStr(BASE_URL_PROP);
+    String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+    String backupName = TestUtil.randomSimpleString(random(), 1, 5);
+    String location = createTempDir().toFile().getAbsolutePath();
+
+    try (
+        SolrClient adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
+        SolrClient masterClient = getHttpSolrClient(replica.getCoreUrl())) {
+
+      int numTests = TestUtil.nextInt(random(), 2, 5);
+      List<SnapshotMetaData> snapshots = new ArrayList<>(numTests);
+
+      // Create multiple commits and create a snapshot per commit.
+      // This should result in Lucene reusing some of the segments for later index commits.
+      for (int attempt=0; attempt<numTests; attempt++) {
+        if (nDocs > 0) {
+          //Delete a few docs
+          int numDeletes = TestUtil.nextInt(random(), 1, nDocs);
+          for(int i=0; i<numDeletes; i++) {
+            masterClient.deleteByQuery("id:" + i);
+          }
+        }
+
+        // Add a few more
+        int moreAdds = TestUtil.nextInt(random(), 1, 100);
+        for (int i = 0; i < moreAdds; i++) {
+          SolrInputDocument doc = new SolrInputDocument();
+          doc.addField("id", i + nDocs);
+          doc.addField("name", "name = " + (i + nDocs));
+          masterClient.add(doc);
+        }
+        masterClient.commit();
+
+        // Create a snapshot
+        snapshots.add(createSnapshot(adminClient, coreName, "snapshot_" + attempt));
+      }
+
+      // Backup the earlier created snapshot.
+      {
+        Map<String,String> params = new HashMap<>();
+        params.put("name", backupName);
+        params.put("commitName", snapshots.get(0).getName());
+        params.put("location", location);
+        BackupRestoreUtils.runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.BACKUPCORE.toString(), params);
+      }
+
+      // Restore the backup. The purpose of the restore operation is to change the *current* index directory.
+      // This is required since we delegate the file deletion to underlying IndexDeletionPolicy in case of
+      // *current* index directory. Hence for the purpose of this test, we want to ensure that the created
+      // snapshots are NOT in the *current* index directory.
+      {
+        Map<String,String> params = new HashMap<>();
+        params.put("name", "snapshot." + backupName);
+        params.put("location", location);
+        BackupRestoreUtils.runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.RESTORECORE.toString(), params);
+      }
+
+      {
+        SnapshotMetaData snapshotMetaData = snapshots.get(0);
+
+        List<IndexCommit> commits = listCommits(snapshotMetaData.getIndexDirPath());
+        // Check if number of index commits are > 0 to ensure index file sharing.
+        assertTrue(commits.size() > 0);
+        Map<String,Integer> refCounts = SolrSnapshotManager.buildRefCounts(snapshots, commits);
+
+        Optional<IndexCommit> ic = commits.stream()
+            .filter(entry -> entry.getGeneration() == snapshotMetaData.getGenerationNumber())
+            .findFirst();
+        assertTrue(ic.isPresent());
+        Collection<String> nonSharedFiles = new ArrayList<>();
+        Collection<String> sharedFiles = new ArrayList<>();
+        for (String fileName : ic.get().getFileNames()) {
+          if (refCounts.getOrDefault(fileName, 0) > 1) {
+            sharedFiles.add(fileName);
+          } else {
+            nonSharedFiles.add(fileName);
+          }
+        }
+
+        // Delete snapshot
+        deleteSnapshot(adminClient, coreName, snapshotMetaData.getName());
+
+        // Verify that the shared files are not deleted.
+        for (String fileName : sharedFiles) {
+          Path path = Paths.get(snapshotMetaData.getIndexDirPath(), fileName);
+          assertTrue(path + " should exist.", Files.exists(path));
+        }
+
+        // Verify that the non-shared files are deleted.
+        for (String fileName : nonSharedFiles) {
+          Path path = Paths.get(snapshotMetaData.getIndexDirPath(), fileName);
+          assertFalse(path + " should not exist.", Files.exists(path));
+        }
+        }
+      }
+  }
+
+  @Test
+  public void testIndexOptimization() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String collectionName = "SolrCoreSnapshots_IndexOptimization";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
+    create.process(solrClient);
+
+    int nDocs = BackupRestoreUtils.indexDocs(cluster.getSolrClient(), collectionName, docsSeed);
+
+    DocCollection collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+    assertEquals(1, collectionState.getActiveSlices().size());
+    Slice shard = collectionState.getActiveSlices().iterator().next();
+    assertEquals(1, shard.getReplicas().size());
+    Replica replica = shard.getReplicas().iterator().next();
+
+    String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+    String commitName = TestUtil.randomSimpleString(random(), 1, 5);
+
+    try (
+        SolrClient adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
+        SolrClient masterClient = getHttpSolrClient(replica.getCoreUrl())) {
+
+      SnapshotMetaData metaData = createSnapshot(adminClient, coreName, commitName);
+
+      int numTests = nDocs > 0 ? TestUtil.nextInt(random(), 1, 5) : 1;
+      for (int attempt=0; attempt<numTests; attempt++) {
+        //Modify existing index before we call optimize.
+        if (nDocs > 0) {
+          //Delete a few docs
+          int numDeletes = TestUtil.nextInt(random(), 1, nDocs);
+          for(int i=0; i<numDeletes; i++) {
+            masterClient.deleteByQuery("id:" + i);
+          }
+          //Add a few more
+          int moreAdds = TestUtil.nextInt(random(), 1, 100);
+          for (int i=0; i<moreAdds; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i + nDocs);
+            doc.addField("name", "name = " + (i + nDocs));
+            masterClient.add(doc);
+          }
+          masterClient.commit();
+        }
+      }
+
+      // Before invoking optimize command, verify that the index directory contains multiple commits (including the one we snapshotted earlier).
+      {
+        Collection<IndexCommit> commits = listCommits(metaData.getIndexDirPath());
+        // Verify that multiple index commits are stored in this directory.
+        assertTrue(commits.size() > 0);
+        // Verify that the snapshot commit is present in this directory.
+        assertTrue(commits.stream().filter(x -> x.getGeneration() == metaData.getGenerationNumber()).findFirst().isPresent());
+      }
+
+      // Optimize the index.
+      masterClient.optimize(true, true, 1);
+
+      // After invoking optimize command, verify that the index directory contains multiple commits (including the one we snapshotted earlier).
+      {
+        List<IndexCommit> commits = listCommits(metaData.getIndexDirPath());
+        // Verify that multiple index commits are stored in this directory.
+        assertTrue(commits.size() > 1);
+        // Verify that the snapshot commit is present in this directory.
+        assertTrue(commits.stream().filter(x -> x.getGeneration() == metaData.getGenerationNumber()).findFirst().isPresent());
+      }
+
+      // Delete the snapshot
+      deleteSnapshot(adminClient, coreName, metaData.getName());
+
+      // Add few documents. Without this the optimize command below does not take effect.
+      {
+        int moreAdds = TestUtil.nextInt(random(), 1, 100);
+        for (int i=0; i<moreAdds; i++) {
+          SolrInputDocument doc = new SolrInputDocument();
+          doc.addField("id", i + nDocs);
+          doc.addField("name", "name = " + (i + nDocs));
+          masterClient.add(doc);
+        }
+        masterClient.commit();
+      }
+
+      // Optimize the index.
+      masterClient.optimize(true, true, 1);
+
+      // Verify that the index directory contains only 1 index commit (which is not the same as the snapshotted commit).
+      Collection<IndexCommit> commits = listCommits(metaData.getIndexDirPath());
+      assertTrue(commits.size() == 1);
+      assertFalse(commits.stream().filter(x -> x.getGeneration() == metaData.getGenerationNumber()).findFirst().isPresent());
+    }
+  }
+
+  private SnapshotMetaData createSnapshot (SolrClient adminClient, String coreName, String commitName) throws Exception {
+    CreateSnapshot req = new CreateSnapshot(commitName);
+    req.setCoreName(coreName);
+    adminClient.request(req);
+
+    Collection<SnapshotMetaData> snapshots = listSnapshots(adminClient, coreName);
+    Optional<SnapshotMetaData> metaData = snapshots.stream().filter(x -> commitName.equals(x.getName())).findFirst();
+    assertTrue(metaData.isPresent());
+
+    return metaData.get();
+  }
+
+  private void deleteSnapshot(SolrClient adminClient, String coreName, String commitName) throws Exception {
+    DeleteSnapshot req = new DeleteSnapshot(commitName);
+    req.setCoreName(coreName);
+    adminClient.request(req);
+
+    Collection<SnapshotMetaData> snapshots = listSnapshots(adminClient, coreName);
+    assertFalse(snapshots.stream().filter(x -> commitName.equals(x.getName())).findFirst().isPresent());
+  }
+
+  private Collection<SnapshotMetaData> listSnapshots(SolrClient adminClient, String coreName) throws Exception {
+    ListSnapshots req = new ListSnapshots();
+    req.setCoreName(coreName);
+    NamedList resp = adminClient.request(req);
+    assertTrue( resp.get("snapshots") instanceof NamedList );
+    NamedList apiResult = (NamedList) resp.get("snapshots");
+
+    List<SnapshotMetaData> result = new ArrayList<>(apiResult.size());
+    for(int i = 0 ; i < apiResult.size(); i++) {
+      String commitName = apiResult.getName(i);
+      String indexDirPath = (String)((NamedList)apiResult.get(commitName)).get("indexDirPath");
+      long genNumber = Long.valueOf((String)((NamedList)apiResult.get(commitName)).get("generation"));
+      result.add(new SnapshotMetaData(commitName, indexDirPath, genNumber));
+    }
+    return result;
+  }
+
+  private List<IndexCommit> listCommits(String directory) throws Exception {
+    SimpleFSDirectory dir = new SimpleFSDirectory(Paths.get(directory));
+    try {
+      return DirectoryReader.listCommits(dir);
+    } catch (IndexNotFoundException ex) {
+      // This can happen when the delete snapshot functionality cleans up the index files (when the directory
+      // storing these files is not the *current* index directory).
+      return Collections.emptyList();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/test/org/apache/solr/handler/BackupRestoreUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/BackupRestoreUtils.java b/solr/core/src/test/org/apache/solr/handler/BackupRestoreUtils.java
index e2f4304..34509cf 100644
--- a/solr/core/src/test/org/apache/solr/handler/BackupRestoreUtils.java
+++ b/solr/core/src/test/org/apache/solr/handler/BackupRestoreUtils.java
@@ -18,11 +18,15 @@
 package org.apache.solr.handler;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.SolrClient;
@@ -64,4 +68,37 @@ public class BackupRestoreUtils extends LuceneTestCase {
     assertEquals(0, response.getStatus());
     assertEquals(nDocs, response.getResults().getNumFound());
   }
+
+  public static void runCoreAdminCommand(String baseUrl, String coreName, String action, Map<String,String> params) throws IOException {
+    StringBuilder builder = new StringBuilder();
+    builder.append(baseUrl);
+    builder.append("/admin/cores?action=");
+    builder.append(action);
+    builder.append("&core=");
+    builder.append(coreName);
+    for (Map.Entry<String,String> p : params.entrySet()) {
+      builder.append("&");
+      builder.append(p.getKey());
+      builder.append("=");
+      builder.append(p.getValue());
+    }
+    String masterUrl = builder.toString();
+    executeHttpRequest(masterUrl);
+  }
+
+  public static void runReplicationHandlerCommand(String baseUrl, String coreName, String action, String repoName, String backupName) throws IOException {
+    String masterUrl = baseUrl + "/" + coreName + ReplicationHandler.PATH + "?command=" + action + "&repository="+repoName+"&name="+backupName;
+    executeHttpRequest(masterUrl);
+  }
+
+  static void executeHttpRequest(String requestUrl) throws IOException {
+    InputStream stream = null;
+    try {
+      URL url = new URL(requestUrl);
+      stream = url.openStream();
+      stream.close();
+    } finally {
+      IOUtils.closeQuietly(stream);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/test/org/apache/solr/handler/TestHdfsBackupRestoreCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/TestHdfsBackupRestoreCore.java b/solr/core/src/test/org/apache/solr/handler/TestHdfsBackupRestoreCore.java
index a840428..4e8d4cc 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestHdfsBackupRestoreCore.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestHdfsBackupRestoreCore.java
@@ -18,11 +18,11 @@
 package org.apache.solr.handler;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import org.apache.commons.io.IOUtils;
@@ -44,6 +44,7 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.util.BadHdfsThreadsFilter;
 import org.junit.AfterClass;
@@ -176,16 +177,19 @@ public class TestHdfsBackupRestoreCore extends SolrCloudTestCase {
     try (SolrClient masterClient = getHttpSolrClient(replicaBaseUrl)) {
       // Create a backup.
       if (testViaReplicationHandler) {
-        log.info("Running Backup/restore via replication handler");
-        runReplicationHandlerCommand(baseUrl, coreName, ReplicationHandler.CMD_BACKUP, "hdfs", backupName);
+        log.info("Running Backup via replication handler");
+        BackupRestoreUtils.runReplicationHandlerCommand(baseUrl, coreName, ReplicationHandler.CMD_BACKUP, "hdfs", backupName);
         CheckBackupStatus checkBackupStatus = new CheckBackupStatus((HttpSolrClient) masterClient, coreName, null);
         while (!checkBackupStatus.success) {
           checkBackupStatus.fetchStatus();
           Thread.sleep(1000);
         }
       } else {
-        log.info("Running Backup/restore via core admin api");
-        runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.BACKUPCORE.toString(), "hdfs", backupName);
+        log.info("Running Backup via core admin api");
+        Map<String,String> params = new HashMap<>();
+        params.put("name", backupName);
+        params.put(CoreAdminParams.BACKUP_REPOSITORY, "hdfs");
+        BackupRestoreUtils.runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.BACKUPCORE.toString(), params);
       }
 
       int numRestoreTests = nDocs > 0 ? TestUtil.nextInt(random(), 1, 5) : 1;
@@ -214,38 +218,22 @@ public class TestHdfsBackupRestoreCore extends SolrCloudTestCase {
         }
         // Snapshooter prefixes "snapshot." to the backup name.
         if (testViaReplicationHandler) {
+          log.info("Running Restore via replication handler");
           // Snapshooter prefixes "snapshot." to the backup name.
-          runReplicationHandlerCommand(baseUrl, coreName, ReplicationHandler.CMD_RESTORE, "hdfs", backupName);
+          BackupRestoreUtils.runReplicationHandlerCommand(baseUrl, coreName, ReplicationHandler.CMD_RESTORE, "hdfs", backupName);
           while (!TestRestoreCore.fetchRestoreStatus(baseUrl, coreName)) {
             Thread.sleep(1000);
           }
         } else {
-          runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.RESTORECORE.toString(), "hdfs", "snapshot." + backupName);
+          log.info("Running Restore via core admin api");
+          Map<String,String> params = new HashMap<>();
+          params.put("name", "snapshot." + backupName);
+          params.put(CoreAdminParams.BACKUP_REPOSITORY, "hdfs");
+          BackupRestoreUtils.runCoreAdminCommand(replicaBaseUrl, coreName, CoreAdminAction.RESTORECORE.toString(), params);
         }
         //See if restore was successful by checking if all the docs are present again
         BackupRestoreUtils.verifyDocs(nDocs, masterClient, coreName);
       }
     }
   }
-
-  static void runCoreAdminCommand(String baseUrl, String coreName, String action, String repoName, String backupName) throws IOException {
-    String masterUrl = baseUrl + "/admin/cores?action=" + action + "&core="+coreName+"&repository="+repoName+"&name="+backupName;
-    executeHttpRequest(masterUrl);
-  }
-
-  static void runReplicationHandlerCommand(String baseUrl, String coreName, String action, String repoName, String backupName) throws IOException {
-    String masterUrl = baseUrl + "/" + coreName + ReplicationHandler.PATH + "?command=" + action + "&repository="+repoName+"&name="+backupName;
-    executeHttpRequest(masterUrl);
-  }
-
-  static void executeHttpRequest(String requestUrl) throws IOException {
-    InputStream stream = null;
-    try {
-      URL url = new URL(requestUrl);
-      stream = url.openStream();
-      stream.close();
-    } finally {
-      IOUtils.closeQuietly(stream);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
index b9bcf7b..08c462b 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
@@ -20,6 +20,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -66,6 +67,7 @@ import org.apache.solr.core.CachingDirectoryFactory;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.StandardDirectoryFactory;
+import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.util.FileUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -896,8 +898,8 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
       CachingDirectoryFactory dirFactory = (CachingDirectoryFactory) core.getDirectoryFactory();
       synchronized (dirFactory) {
         Set<String> livePaths = dirFactory.getLivePaths();
-        // one for data, one for hte index under data
-        assertEquals(livePaths.toString(), 2, livePaths.size());
+        // one for data, one for hte index under data and one for the snapshot metadata.
+        assertEquals(livePaths.toString(), 3, livePaths.size());
         // :TODO: assert that one of the paths is a subpath of hte other
       }
       if (dirFactory instanceof StandardDirectoryFactory) {
@@ -908,14 +910,14 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
   }
 
   private int indexDirCount(String ddir) {
-    String[] list = new File(ddir).list();
-    int cnt = 0;
-    for (String file : list) {
-      if (!file.endsWith(".properties")) {
-        cnt++;
+    String[] list = new File(ddir).list(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        File f = new File(dir, name);
+        return f.isDirectory() && !SolrSnapshotMetaDataManager.SNAPSHOT_METADATA_DIR.equals(name);
       }
-    }
-    return cnt;
+    });
+    return list.length;
   }
 
   private void pullFromMasterToSlave() throws MalformedURLException,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
index 7d9e356..f3e4e19 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
@@ -453,6 +453,63 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
 
   }
 
+  public static class CreateSnapshot extends CoreAdminRequest {
+    private String commitName;
+
+    public CreateSnapshot(String commitName) {
+      super();
+      this.action = CoreAdminAction.CREATESNAPSHOT;
+      if(commitName == null) {
+        throw new NullPointerException("Please specify non null value for commitName parameter.");
+      }
+      this.commitName = commitName;
+    }
+
+    public String getCommitName() {
+      return commitName;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
+      params.set(CoreAdminParams.COMMIT_NAME, this.commitName);
+      return params;
+    }
+  }
+
+  public static class DeleteSnapshot extends CoreAdminRequest {
+    private String commitName;
+
+    public DeleteSnapshot(String commitName) {
+      super();
+      this.action = CoreAdminAction.DELETESNAPSHOT;
+
+      if(commitName == null) {
+        throw new NullPointerException("Please specify non null value for commitName parameter.");
+      }
+      this.commitName = commitName;
+    }
+
+    public String getCommitName() {
+      return commitName;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
+      params.set(CoreAdminParams.COMMIT_NAME, this.commitName);
+      return params;
+    }
+  }
+
+  public static class ListSnapshots extends CoreAdminRequest {
+    public ListSnapshots() {
+      super();
+      this.action = CoreAdminAction.LISTSNAPSHOTS;
+    }
+  }
+
+
   public CoreAdminRequest()
   {
     super( METHOD.GET, "/admin/cores" );

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7aa0b56/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index 7455cbf..7f90a90 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -118,6 +118,11 @@ public abstract class CoreAdminParams
    */
   public static final String BACKUP_LOCATION = "location";
 
+  /**
+   * A parameter to specify the name of the commit to be stored during the backup operation.
+   */
+  public static final String COMMIT_NAME = "commitName";
+
   public enum CoreAdminAction {
     STATUS(true),
     UNLOAD,
@@ -141,7 +146,10 @@ public abstract class CoreAdminParams
     INVOKE,
     //Internal APIs to backup and restore a core
     BACKUPCORE,
-    RESTORECORE;
+    RESTORECORE,
+    CREATESNAPSHOT,
+    DELETESNAPSHOT,
+    LISTSNAPSHOTS;
 
     public final boolean isRead;