You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by va...@apache.org on 2016/07/06 07:52:15 UTC

lucene-solr:branch_6x: SOLR-9242: Collection backup/restore to provide a param for specifying underlying storage repository to use

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 98ad225cf -> 413ea4767


SOLR-9242: Collection backup/restore to provide a param for specifying underlying storage repository to use


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/413ea476
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/413ea476
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/413ea476

Branch: refs/heads/branch_6x
Commit: 413ea476700429bd39c659cdd0bc6682263b0545
Parents: 98ad225
Author: Varun Thacker <va...@apche.org>
Authored: Wed Jul 6 13:10:33 2016 +0530
Committer: Varun Thacker <va...@apache.org>
Committed: Wed Jul 6 13:21:31 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../cloud/OverseerCollectionMessageHandler.java |  87 +++---
 .../org/apache/solr/core/CoreContainer.java     |  19 +-
 .../apache/solr/core/backup/BackupManager.java  | 250 +++++++++++++++++
 .../apache/solr/core/backup/package-info.java   |  22 ++
 .../backup/repository/BackupRepository.java     |  19 +-
 .../apache/solr/handler/ReplicationHandler.java |  38 +--
 .../solr/handler/admin/CollectionsHandler.java  |  56 +++-
 .../solr/handler/admin/CoreAdminOperation.java  |  42 +--
 .../AbstractCloudBackupRestoreTestCase.java     | 273 +++++++++++++++++++
 .../solr/cloud/TestCloudBackupRestore.java      | 219 ---------------
 .../solr/cloud/TestHdfsCloudBackupRestore.java  | 148 ++++++++++
 .../cloud/TestLocalFSCloudBackupRestore.java    |  50 ++++
 .../solr/core/TestBackupRepositoryFactory.java  |  10 +-
 .../solrj/request/CollectionAdminRequest.java   |  32 ++-
 .../apache/solr/common/cloud/ZkStateReader.java |   4 +-
 .../solr/common/params/CoreAdminParams.java     |  10 +
 17 files changed, 941 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bb0c348..05d59fc 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -46,6 +46,9 @@ New Features
 
 * SOLR-9194: Enhance the bin/solr script to perform file operations to/from Zookeeper (Erick Erickson, janhoy)
 
+* SOLR-9242: Collection Backup/Restore now supports specifying the directory implementation to use
+  via the "repository" parameter. (Hrishikesh Gadre, Varun Thacker)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index a0ac732..27a2824 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -17,13 +17,8 @@
 package org.apache.solr.cloud;
 
 import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
 import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.net.URI;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,6 +31,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -84,6 +80,9 @@ import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
@@ -2215,21 +2214,28 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
   private void processBackupAction(ZkNodeProps message, NamedList results) throws IOException, KeeperException, InterruptedException {
     String collectionName =  message.getStr(COLLECTION_PROP);
     String backupName =  message.getStr(NAME);
-    String location = message.getStr(ZkStateReader.BACKUP_LOCATION);
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     String asyncId = message.getStr(ASYNC);
+    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
+
     Map<String, String> requestMap = new HashMap<>();
     Instant startTime = Instant.now();
 
-    // note: we assume a shared files system to backup a collection, since a collection is distributed
-    Path backupPath = Paths.get(location).resolve(backupName).toAbsolutePath();
+    CoreContainer cc = this.overseer.getZkController().getCoreContainer();
+    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+    BackupManager backupMgr = new BackupManager(repository, zkStateReader, collectionName);
+
+    // Backup location
+    URI backupPath = repository.createURI(location, backupName);
 
     //Validating if the directory already exists.
-    if (Files.exists(backupPath)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "Backup directory already exists: " + backupPath);
+    if (repository.exists(backupPath)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
     }
-    Files.createDirectory(backupPath); // create now
+
+    // Create a directory to store backup details.
+    repository.createDirectory(backupPath);
 
     log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
         backupPath);
@@ -2242,7 +2248,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminAction.BACKUPCORE.toString());
       params.set(NAME, slice.getName());
-      params.set("location", backupPath.toString()); // note: index dir will be here then the "snapshot." + slice name
+      params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+      params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath()); // note: index dir will be here then the "snapshot." + slice name
       params.set(CORE_NAME_PROP, coreName);
 
       sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
@@ -2256,29 +2263,24 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
     //Download the configs
     String configName = zkStateReader.readConfigName(collectionName);
-    Path zkBackup =  backupPath.resolve("zk_backup");
-    zkStateReader.getConfigManager().downloadConfigDir(configName, zkBackup.resolve("configs").resolve(configName));
+    backupMgr.downloadConfigDir(location, backupName, configName);
 
     //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
     //Since we don't want to distinguish we extract the state and back it up as a separate json
-    DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
-    Files.write(zkBackup.resolve("collection_state.json"),
-        Utils.toJSON(Collections.singletonMap(collectionName, collection)));
+    DocCollection collectionState = zkStateReader.getClusterState().getCollection(collectionName);
+    backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
 
-    Path propertiesPath = backupPath.resolve("backup.properties");
     Properties properties = new Properties();
 
-    properties.put("backupName", backupName);
-    properties.put("collection", collectionName);
-    properties.put("collection.configName", configName);
-    properties.put("startTime", startTime.toString());
+    properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
+    properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
+    properties.put(COLL_CONF, configName);
+    properties.put(BackupManager.START_TIME_PROP, startTime.toString());
     //TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
     //if they are not the same then we can throw an error or have an 'overwriteConfig' flag
     //TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
 
-    try (Writer os = Files.newBufferedWriter(propertiesPath, StandardCharsets.UTF_8)) {
-      properties.store(os, "Snapshot properties file");
-    }
+    backupMgr.writeBackupProperties(location, backupName, properties);
 
     log.info("Completed backing up ZK data for backupName={}", backupName);
   }
@@ -2287,26 +2289,21 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     // TODO maybe we can inherit createCollection's options/code
     String restoreCollectionName =  message.getStr(COLLECTION_PROP);
     String backupName =  message.getStr(NAME); // of backup
-    String location = message.getStr(ZkStateReader.BACKUP_LOCATION);
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     String asyncId = message.getStr(ASYNC);
+    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
     Map<String, String> requestMap = new HashMap<>();
 
-    Path backupPath = Paths.get(location).resolve(backupName).toAbsolutePath();
-    if (!Files.exists(backupPath)) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Couldn't restore since doesn't exist: " + backupPath);
-    }
-    Path backupZkPath =  backupPath.resolve("zk_backup");
+    CoreContainer cc = this.overseer.getZkController().getCoreContainer();
+    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
 
-    Properties properties = new Properties();
-    try (Reader in = Files.newBufferedReader(backupPath.resolve("backup.properties"), StandardCharsets.UTF_8)) {
-      properties.load(in);
-    }
+    URI backupPath = repository.createURI(location, backupName);
+    BackupManager backupMgr = new BackupManager(repository, zkStateReader, restoreCollectionName);
 
-    String backupCollection = (String) properties.get("collection");
-    byte[] data = Files.readAllBytes(backupZkPath.resolve("collection_state.json"));
-    ClusterState backupClusterState = ClusterState.load(-1, data, Collections.emptySet());
-    DocCollection backupCollectionState = backupClusterState.getCollection(backupCollection);
+    Properties properties = backupMgr.readBackupProperties(location, backupName);
+    String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
+    DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
 
     //Upload the configs
     String configName = (String) properties.get(COLL_CONF);
@@ -2316,11 +2313,11 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       //TODO add overwrite option?
     } else {
       log.info("Uploading config {}", restoreConfigName);
-      zkStateReader.getConfigManager().uploadConfigDir(backupZkPath.resolve("configs").resolve(configName), restoreConfigName);
+      backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
     }
 
     log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
-        backupPath);
+        location);
 
     //Create core-less collection
     {
@@ -2410,7 +2407,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminAction.RESTORECORE.toString());
       params.set(NAME, "snapshot." + slice.getName());
-      params.set("location", backupPath.toString());
+      params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath());
+      params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+
       sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
     }
     processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 422a761..a6d4066 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -44,6 +45,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.admin.CollectionsHandler;
@@ -149,8 +151,21 @@ public class CoreContainer {
 
   private BackupRepositoryFactory backupRepoFactory;
 
-  public BackupRepositoryFactory getBackupRepoFactory() {
-    return backupRepoFactory;
+  /**
+   * This method instantiates a new instance of {@linkplain BackupRepository}.
+   *
+   * @param repositoryName The name of the backup repository (Optional).
+   *                       If not specified, a default implementation is used.
+   * @return a new instance of {@linkplain BackupRepository}.
+   */
+  public BackupRepository newBackupRepository(Optional<String> repositoryName) {
+    BackupRepository repository;
+    if (repositoryName.isPresent()) {
+      repository = backupRepoFactory.newInstance(getResourceLoader(), repositoryName.get());
+    } else {
+      repository = backupRepoFactory.newInstance(getResourceLoader());
+    }
+    return repository;
   }
 
   public ExecutorService getCoreZkRegisterExecutorService() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
new file mode 100644
index 0000000..0575bff
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -0,0 +1,250 @@
+package org.apache.solr.core.backup;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.repository.BackupRepository.PathType;
+import org.apache.solr.util.PropertiesInputStream;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This class implements functionality to create a backup with extension points provided to integrate with different
+ * types of file-systems.
+ */
+public class BackupManager {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String COLLECTION_PROPS_FILE = "collection_state.json";
+  public static final String BACKUP_PROPS_FILE = "backup.properties";
+  public static final String ZK_STATE_DIR = "zk_backup";
+  public static final String CONFIG_STATE_DIR = "configs";
+
+  // Backup properties
+  public static final String COLLECTION_NAME_PROP = "collection";
+  public static final String BACKUP_NAME_PROP = "backupName";
+  public static final String INDEX_VERSION_PROP = "index.version";
+  public static final String START_TIME_PROP = "startTime";
+
+  protected final ZkStateReader zkStateReader;
+  protected final BackupRepository repository;
+
+  public BackupManager(BackupRepository repository, ZkStateReader zkStateReader, String collectionName) {
+    this.repository = Preconditions.checkNotNull(repository);
+    this.zkStateReader = Preconditions.checkNotNull(zkStateReader);
+  }
+
+  /**
+   * @return The version of this backup implementation.
+   */
+  public final String getVersion() {
+    return "1.0";
+  }
+
+  /**
+   * This method returns the configuration parameters for the specified backup.
+   *
+   * @param backupLoc The base path used to store the backup data.
+   * @param backupId  The unique name for the backup whose configuration params are required.
+   * @return the configuration parameters for the specified backup.
+   * @throws IOException In case of errors.
+   */
+  public Properties readBackupProperties(String backupLoc, String backupId) throws IOException {
+    Preconditions.checkNotNull(backupLoc);
+    Preconditions.checkNotNull(backupId);
+
+    // Backup location
+    URI backupPath = repository.createURI(backupLoc, backupId);
+    if (!repository.exists(backupPath)) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Couldn't restore since doesn't exist: " + backupPath);
+    }
+
+    Properties props = new Properties();
+    try (Reader is = new InputStreamReader(new PropertiesInputStream(
+        repository.openInput(backupPath, BACKUP_PROPS_FILE, IOContext.DEFAULT)), StandardCharsets.UTF_8)) {
+      props.load(is);
+      return props;
+    }
+  }
+
+  /**
+   * This method stores the backup properties at the specified location in the repository.
+   *
+   * @param backupLoc  The base path used to store the backup data.
+   * @param backupId  The unique name for the backup whose configuration params are required.
+   * @param props The backup properties
+   * @throws IOException in case of I/O error
+   */
+  public void writeBackupProperties(String backupLoc, String backupId, Properties props) throws IOException {
+    URI dest = repository.createURI(backupLoc, backupId, BACKUP_PROPS_FILE);
+    try (Writer propsWriter = new OutputStreamWriter(repository.createOutput(dest), StandardCharsets.UTF_8)) {
+      props.store(propsWriter, "Backup properties file");
+    }
+  }
+
+  /**
+   * This method reads the meta-data information for the backed-up collection.
+   *
+   * @param backupLoc The base path used to store the backup data.
+   * @param backupId The unique name for the backup.
+   * @return the meta-data information for the backed-up collection.
+   * @throws IOException in case of errors.
+   */
+  public DocCollection readCollectionState(String backupLoc, String backupId, String collectionName) throws IOException {
+    Preconditions.checkNotNull(collectionName);
+
+    URI zkStateDir = repository.createURI(backupLoc, backupId, ZK_STATE_DIR);
+    try (IndexInput is = repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
+      byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
+      is.readBytes(arr, 0, (int) is.length());
+      ClusterState c_state = ClusterState.load(-1, arr, Collections.emptySet());
+      return c_state.getCollection(collectionName);
+    }
+  }
+
+  /**
+   * This method writes the collection meta-data to the specified location in the repository.
+   *
+   * @param backupLoc The base path used to store the backup data.
+   * @param backupId  The unique name for the backup.
+   * @param collectionName The name of the collection whose meta-data is being stored.
+   * @param collectionState The collection meta-data to be stored.
+   * @throws IOException in case of I/O errors.
+   */
+  public void writeCollectionState(String backupLoc, String backupId, String collectionName,
+                                   DocCollection collectionState) throws IOException {
+    URI dest = repository.createURI(backupLoc, backupId, ZK_STATE_DIR, COLLECTION_PROPS_FILE);
+    try (OutputStream collectionStateOs = repository.createOutput(dest)) {
+      collectionStateOs.write(Utils.toJSON(Collections.singletonMap(collectionName, collectionState)));
+    }
+  }
+
+  /**
+   * This method uploads the Solr configuration files to the desired location in Zookeeper.
+   *
+   * @param backupLoc  The base path used to store the backup data.
+   * @param backupId  The unique name for the backup.
+   * @param sourceConfigName The name of the config to be copied
+   * @param targetConfigName  The name of the config to be created.
+   * @throws IOException in case of I/O errors.
+   */
+  public void uploadConfigDir(String backupLoc, String backupId, String sourceConfigName, String targetConfigName)
+      throws IOException {
+    URI source = repository.createURI(backupLoc, backupId, ZK_STATE_DIR, CONFIG_STATE_DIR, sourceConfigName);
+    String zkPath = ZkConfigManager.CONFIGS_ZKNODE + "/" + targetConfigName;
+    uploadToZk(zkStateReader.getZkClient(), source, zkPath);
+  }
+
+  /**
+   * This method stores the contents of a specified Solr config at the specified location in repository.
+   *
+   * @param backupLoc  The base path used to store the backup data.
+   * @param backupId  The unique name for the backup.
+   * @param configName The name of the config to be saved.
+   * @throws IOException in case of I/O errors.
+   */
+  public void downloadConfigDir(String backupLoc, String backupId, String configName) throws IOException {
+    URI dest = repository.createURI(backupLoc, backupId, ZK_STATE_DIR, CONFIG_STATE_DIR, configName);
+    repository.createDirectory(repository.createURI(backupLoc, backupId, ZK_STATE_DIR));
+    repository.createDirectory(repository.createURI(backupLoc, backupId, ZK_STATE_DIR, CONFIG_STATE_DIR));
+    repository.createDirectory(dest);
+
+    downloadFromZK(zkStateReader.getZkClient(), ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, dest);
+  }
+
+  private void downloadFromZK(SolrZkClient zkClient, String zkPath, URI dir) throws IOException {
+    try {
+      if (!repository.exists(dir)) {
+        repository.createDirectory(dir);
+      }
+      List<String> files = zkClient.getChildren(zkPath, null, true);
+      for (String file : files) {
+        List<String> children = zkClient.getChildren(zkPath + "/" + file, null, true);
+        if (children.size() == 0) {
+          log.info("Writing file {}", file);
+          byte[] data = zkClient.getData(zkPath + "/" + file, null, null, true);
+          try (OutputStream os = repository.createOutput(repository.createURI(dir.getPath(), file))) {
+            os.write(data);
+          }
+        } else {
+          downloadFromZK(zkClient, zkPath + "/" + file, repository.createURI(dir.getPath(), file));
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new IOException("Error downloading files from zookeeper path " + zkPath + " to " + dir.toString(),
+          SolrZkClient.checkInterrupted(e));
+    }
+  }
+
+  private void uploadToZk(SolrZkClient zkClient, URI sourceDir, String destZkPath) throws IOException {
+    Preconditions.checkArgument(repository.exists(sourceDir), "Path {} does not exist", sourceDir);
+    Preconditions.checkArgument(repository.getPathType(sourceDir) == PathType.DIRECTORY,
+        "Path {} is not a directory", sourceDir);
+
+    for (String file : repository.listAll(sourceDir)) {
+      String zkNodePath = destZkPath + "/" + file;
+      URI path = repository.createURI(sourceDir.getPath(), file);
+      PathType t = repository.getPathType(path);
+      switch (t) {
+        case FILE: {
+          try (IndexInput is = repository.openInput(sourceDir, file, IOContext.DEFAULT)) {
+            byte[] arr = new byte[(int) is.length()]; // probably ok since the config file should be small.
+            is.readBytes(arr, 0, (int) is.length());
+            zkClient.makePath(zkNodePath, arr, true);
+          } catch (KeeperException | InterruptedException e) {
+            throw new IOException(e);
+          }
+          break;
+        }
+
+        case DIRECTORY: {
+          if (!file.startsWith(".")) {
+            uploadToZk(zkClient, path, zkNodePath);
+          }
+          break;
+        }
+        default:
+          throw new IllegalStateException("Unknown path type " + t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/core/backup/package-info.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/backup/package-info.java b/solr/core/src/java/org/apache/solr/core/backup/package-info.java
new file mode 100644
index 0000000..defcad6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/backup/package-info.java
@@ -0,0 +1,22 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+/**
+ * Core classes for Solr's Backup/Restore functionality
+ */
+package org.apache.solr.core.backup;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
index f209b87..20d8628 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
@@ -21,20 +21,18 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.Optional;
+
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
+import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 
 /**
  * This interface defines the functionality required to backup/restore Solr indexes to an arbitrary storage system.
  */
 public interface BackupRepository extends NamedListInitializedPlugin, Closeable {
-  /**
-   * A parameter to specify the name of the backup repository to be used.
-   */
-  String REPOSITORY_PROPERTY_NAME = "repository";
-
 
   /**
    * This enumeration defines the type of a given path.
@@ -44,6 +42,17 @@ public interface BackupRepository extends NamedListInitializedPlugin, Closeable
   }
 
   /**
+   * This method returns the location where the backup should be stored (or restored from).
+   *
+   * @param override The location parameter supplied by the user.
+   * @return If <code>override</code> is not null then return the same value
+   *         Otherwise return the default configuration value for the {@linkplain CoreAdminParams#BACKUP_LOCATION} parameter.
+   */
+  default String getBackupLocation(String override) {
+    return Optional.ofNullable(override).orElse(getConfigProperty(CoreAdminParams.BACKUP_LOCATION));
+  }
+
+  /**
    * This method returns the value of the specified configuration property.
    */
   <T> T getConfigProperty(String name);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 1893a7d..6e1b3a0 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -37,6 +37,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -67,8 +68,8 @@ import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.RateLimiter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -84,7 +85,6 @@ import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrDeletionPolicy;
 import org.apache.solr.core.SolrEventListener;
-import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.backup.repository.LocalFileSystemRepository;
 import org.apache.solr.request.SolrQueryRequest;
@@ -331,7 +331,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
       throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
     }
 
-    SnapShooter snapShooter = new SnapShooter(core, params.get(LOCATION), params.get(NAME));
+    SnapShooter snapShooter = new SnapShooter(core, params.get(CoreAdminParams.BACKUP_LOCATION), params.get(NAME));
     snapShooter.validateDeleteSnapshot();
     snapShooter.deleteSnapAsync(this);
   }
@@ -412,19 +412,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
           "for the same core");
     }
     String name = params.get(NAME);
-    String location = params.get(LOCATION);
+    String location = params.get(CoreAdminParams.BACKUP_LOCATION);
 
-    String repoName = params.get(BackupRepository.REPOSITORY_PROPERTY_NAME);
+    String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
     CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
-    SolrResourceLoader rl = cc.getResourceLoader();
     BackupRepository repo = null;
-    if(repoName != null) {
-      repo = cc.getBackupRepoFactory().newInstance(rl, repoName);
+    if (repoName != null) {
+      repo = cc.newBackupRepository(Optional.of(repoName));
+      location = repo.getBackupLocation(location);
       if (location == null) {
-        location = repo.getConfigProperty(ZkStateReader.BACKUP_LOCATION);
-        if(location == null) {
-          throw new IllegalArgumentException("location is required");
-        }
+        throw new IllegalArgumentException("location is required");
       }
     } else {
       repo = new LocalFileSystemRepository();
@@ -520,18 +517,15 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
         indexCommit = req.getSearcher().getIndexReader().getIndexCommit();
       }
 
-      String location = params.get(ZkStateReader.BACKUP_LOCATION);
-      String repoName = params.get(BackupRepository.REPOSITORY_PROPERTY_NAME);
+      String location = params.get(CoreAdminParams.BACKUP_LOCATION);
+      String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
       CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
-      SolrResourceLoader rl = cc.getResourceLoader();
       BackupRepository repo = null;
-      if(repoName != null) {
-        repo = cc.getBackupRepoFactory().newInstance(rl, repoName);
+      if (repoName != null) {
+        repo = cc.newBackupRepository(Optional.of(repoName));
+        location = repo.getBackupLocation(location);
         if (location == null) {
-          location = repo.getConfigProperty(ZkStateReader.BACKUP_LOCATION);
-          if(location == null) {
-            throw new IllegalArgumentException("location is required");
-          }
+          throw new IllegalArgumentException("location is required");
         }
       } else {
         repo = new LocalFileSystemRepository();
@@ -1645,8 +1639,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     }
   }
 
-  private static final String LOCATION = "location";
-
   private static final String SUCCESS = "success";
 
   private static final String FAILED = "failed";

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 85c98c1..97fbd2d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -26,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -75,6 +77,7 @@ import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.request.SolrQueryRequest;
@@ -807,15 +810,32 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
         }
 
-        String location = req.getParams().get(ZkStateReader.BACKUP_LOCATION);
+        CoreContainer cc = h.coreContainer;
+        String repo = req.getParams().get(CoreAdminParams.BACKUP_REPOSITORY);
+        BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+
+        String location = repository.getBackupLocation(req.getParams().get(CoreAdminParams.BACKUP_LOCATION));
         if (location == null) {
-          location = h.coreContainer.getZkController().getZkStateReader().getClusterProperty("location", (String) null);
+          // Check if the location is specified in the cluster property.
+          location = h.coreContainer.getZkController().getZkStateReader().getClusterProperty(CoreAdminParams.BACKUP_LOCATION, null);
+          if (location == null) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+                + " parameter or as a default repository property or as a cluster property.");
+          }
         }
-        if (location == null) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query parameter or set as a cluster property");
+
+        // Check if the specified location is valid for this repository.
+        URI uri = repository.createURI(location);
+        try {
+          if (!repository.exists(uri)) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, "specified location " + uri + " does not exist.");
+          }
+        } catch (IOException ex) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to check the existance of " + uri + ". Is it valid?", ex);
         }
+
         Map<String, Object> params = req.getParams().getAll(null, NAME, COLLECTION_PROP);
-        params.put("location", location);
+        params.put(CoreAdminParams.BACKUP_LOCATION, location);
         return params;
       }
     },
@@ -831,16 +851,32 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' exists, no action taken.");
         }
 
-        String location = req.getParams().get(ZkStateReader.BACKUP_LOCATION);
+        CoreContainer cc = h.coreContainer;
+        String repo = req.getParams().get(CoreAdminParams.BACKUP_REPOSITORY);
+        BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+
+        String location = repository.getBackupLocation(req.getParams().get(CoreAdminParams.BACKUP_LOCATION));
         if (location == null) {
-          location = h.coreContainer.getZkController().getZkStateReader().getClusterProperty("location", (String) null);
+          // Check if the location is specified in the cluster property.
+          location = h.coreContainer.getZkController().getZkStateReader().getClusterProperty("location", null);
+          if (location == null) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+                + " parameter or as a default repository property or as a cluster property.");
+          }
         }
-        if (location == null) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query parameter or set as a cluster property");
+
+        // Check if the specified location is valid for this repository.
+        URI uri = repository.createURI(location);
+        try {
+          if (!repository.exists(uri)) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, "specified location " + uri + " does not exist.");
+          }
+        } catch (IOException ex) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to check the existance of " + uri + ". Is it valid?", ex);
         }
 
         Map<String, Object> params = req.getParams().getAll(null, NAME, COLLECTION_PROP);
-        params.put("location", location);
+        params.put(CoreAdminParams.BACKUP_LOCATION, location);
         // from CREATE_OP:
         req.getParams().getAll(params, COLL_CONF, REPLICATION_FACTOR, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS);
         copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index 3c52bea..bf89227 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Future;
 
@@ -40,6 +41,7 @@ import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.SyncStrategy;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
@@ -858,21 +860,13 @@ enum CoreAdminOperation {
         throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
       }
 
-      SolrResourceLoader loader = callInfo.handler.coreContainer.getResourceLoader();
-      BackupRepository repository;
-      String repoName = params.get(BackupRepository.REPOSITORY_PROPERTY_NAME);
-      if(repoName != null) {
-        repository = callInfo.handler.coreContainer.getBackupRepoFactory().newInstance(loader, repoName);
-      } else { // Fetch the default.
-        repository = callInfo.handler.coreContainer.getBackupRepoFactory().newInstance(loader);
-      }
+      String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
+      BackupRepository repository = callInfo.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
 
-      String location = params.get(ZkStateReader.BACKUP_LOCATION);
-      if (location == null) {
-        location = repository.getConfigProperty(ZkStateReader.BACKUP_LOCATION);
-        if (location == null) {
-          throw new IllegalArgumentException("location is required");
-        }
+      String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
+      if(location == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+            + " parameter or as a default repository property");
       }
 
       try (SolrCore core = callInfo.handler.coreContainer.getCore(cname)) {
@@ -912,21 +906,13 @@ enum CoreAdminOperation {
         throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
       }
 
-      SolrResourceLoader loader = callInfo.handler.coreContainer.getResourceLoader();
-      BackupRepository repository;
-      String repoName = params.get(BackupRepository.REPOSITORY_PROPERTY_NAME);
-      if(repoName != null) {
-        repository = callInfo.handler.coreContainer.getBackupRepoFactory().newInstance(loader, repoName);
-      } else { // Fetch the default.
-        repository = callInfo.handler.coreContainer.getBackupRepoFactory().newInstance(loader);
-      }
+      String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
+      BackupRepository repository = callInfo.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
 
-      String location = params.get(ZkStateReader.BACKUP_LOCATION);
-      if (location == null) {
-        location = repository.getConfigProperty(ZkStateReader.BACKUP_LOCATION);
-        if (location == null) {
-          throw new IllegalArgumentException("location is required");
-        }
+      String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
+      if(location == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+            + " parameter or as a default repository property");
       }
 
       try (SolrCore core = callInfo.handler.coreContainer.getCore(cname)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
new file mode 100644
index 0000000..96faf92
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
@@ -0,0 +1,273 @@
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest.ClusterProp;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.ShardParams._ROUTE_;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This class implements the logic required to test Solr cloud backup/restore capability.
+ */
+public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
+
+  private static long docsSeed; // see indexDocs()
+
+  @BeforeClass
+  public static void createCluster() throws Exception {
+    docsSeed = random().nextLong();
+  }
+
+  /**
+   * @return The name of the collection to use.
+   */
+  public abstract String getCollectionName();
+
+  /**
+   * @return The name of the backup repository to use.
+   */
+  public abstract String getBackupRepoName();
+
+  /**
+   * @return The absolute path for the backup location.
+   *         Could return null.
+   */
+  public abstract String getBackupLocation();
+
+  @Test
+  public void test() throws Exception {
+    boolean isImplicit = random().nextBoolean();
+    int replFactor = TestUtil.nextInt(random(), 1, 2);
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor);
+    if (NUM_SHARDS * replFactor > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
+      create.setMaxShardsPerNode(NUM_SHARDS);//just to assert it survives the restoration
+    }
+    if (random().nextBoolean()) {
+      create.setAutoAddReplicas(true);//just to assert it survives the restoration
+    }
+    Properties coreProps = new Properties();
+    coreProps.put("customKey", "customValue");//just to assert it survives the restoration
+    create.setProperties(coreProps);
+    if (isImplicit) { //implicit router
+      create.setRouterName(ImplicitDocRouter.NAME);
+      create.setNumShards(null);//erase it. TODO suggest a new createCollectionWithImplicitRouter method
+      create.setShards("shard1,shard2"); // however still same number as NUM_SHARDS; we assume this later
+      create.setRouterField("shard_s");
+    } else {//composite id router
+      if (random().nextBoolean()) {
+        create.setRouterField("shard_s");
+      }
+    }
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    create.process(solrClient);
+
+    indexDocs(getCollectionName());
+
+    if (!isImplicit && random().nextBoolean()) {
+      // shard split the first shard
+      int prevActiveSliceCount = getActiveSliceCount(getCollectionName());
+      CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(getCollectionName());
+      splitShard.setShardName("shard1");
+      splitShard.process(solrClient);
+      // wait until we see one more active slice...
+      for (int i = 0; getActiveSliceCount(getCollectionName()) != prevActiveSliceCount + 1; i++) {
+        assertTrue(i < 30);
+        Thread.sleep(500);
+      }
+      // issue a hard commit.  Split shard does a soft commit which isn't good enough for the backup/snapshooter to see
+      solrClient.commit(getCollectionName());
+    }
+
+    testBackupAndRestore(getCollectionName());
+    testInvalidPath(getCollectionName());
+  }
+
+  // This test verifies the system behavior when the backup location cluster property is configured with an invalid
+  // value for the specified repository (and the default backup location is not configured in solr.xml).
+  private void testInvalidPath(String collectionName) throws Exception {
+    // Execute this test only if the default backup location is NOT configured in solr.xml
+    if (getBackupLocation() == null) {
+      return;
+    }
+
+    String backupName = "invalidbackuprequest";
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    ClusterProp req = CollectionAdminRequest.setClusterProperty(CoreAdminParams.BACKUP_LOCATION, "/location/does/not/exist");
+    assertEquals(0, req.process(solrClient).getStatus());
+
+    // Do not specify the backup location.
+    CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(collectionName, backupName)
+        .setRepositoryName(getBackupRepoName());
+    try {
+      backup.process(solrClient);
+      fail("This request should have failed since the cluster property value for backup location property is invalid.");
+    } catch (SolrServerException ex) {
+      assertTrue(ex.getCause() instanceof RemoteSolrException);
+      assertEquals(ErrorCode.SERVER_ERROR.code, ((RemoteSolrException)ex.getCause()).code());
+    }
+
+    String restoreCollectionName = collectionName + "_invalidrequest";
+    CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+        .setRepositoryName(getBackupRepoName());
+    try {
+      restore.process(solrClient);
+      fail("This request should have failed since the cluster property value for backup location property is invalid.");
+    } catch (SolrServerException ex) {
+      assertTrue(ex.getCause() instanceof RemoteSolrException);
+      assertEquals(ErrorCode.SERVER_ERROR.code, ((RemoteSolrException)ex.getCause()).code());
+    }
+  }
+
+  private int getActiveSliceCount(String collectionName) {
+    return cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName).getActiveSlices().size();
+  }
+
+  private void indexDocs(String collectionName) throws Exception {
+    Random random = new Random(docsSeed);// use a constant seed for the whole test run so that we can easily re-index.
+    int numDocs = random.nextInt(100);
+    if (numDocs == 0) {
+      log.info("Indexing ZERO test docs");
+      return;
+    }
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i=0; i<numDocs; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", i);
+      doc.addField("shard_s", "shard" + (1 + random.nextInt(NUM_SHARDS))); // for implicit router
+      docs.add(doc);
+    }
+    CloudSolrClient client = cluster.getSolrClient();
+    client.add(collectionName, docs);// batch
+    client.commit(collectionName);
+  }
+
+  private void testBackupAndRestore(String collectionName) throws Exception {
+    String backupLocation = getBackupLocation();
+    String backupName = "mytestbackup";
+
+    CloudSolrClient client = cluster.getSolrClient();
+    DocCollection backupCollection = client.getZkStateReader().getClusterState().getCollection(collectionName);
+
+    Map<String, Integer> origShardToDocCount = getShardToDocCountMap(client, backupCollection);
+    assert origShardToDocCount.isEmpty() == false;
+
+    log.info("Triggering Backup command");
+
+    {
+      CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(collectionName, backupName)
+          .setLocation(backupLocation).setRepositoryName(getBackupRepoName());
+      if (random().nextBoolean()) {
+        assertEquals(0, backup.process(client).getStatus());
+      } else {
+        assertEquals(RequestStatusState.COMPLETED, backup.processAndWait(client, 30));//async
+      }
+    }
+
+    log.info("Triggering Restore command");
+
+    String restoreCollectionName = collectionName + "_restored";
+    boolean sameConfig = random().nextBoolean();
+
+    {
+      CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+          .setLocation(backupLocation).setRepositoryName(getBackupRepoName());
+
+      if (origShardToDocCount.size() > cluster.getJettySolrRunners().size()) {
+        // may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
+        restore.setMaxShardsPerNode(origShardToDocCount.size());
+      }
+      Properties props = new Properties();
+      props.setProperty("customKey", "customVal");
+      restore.setProperties(props);
+      if (sameConfig==false) {
+        restore.setConfigName("customConfigName");
+      }
+      if (random().nextBoolean()) {
+        assertEquals(0, restore.process(client).getStatus());
+      } else {
+        assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(client, 30));//async
+      }
+      AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+          restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
+    }
+
+    //Check the number of results are the same
+    DocCollection restoreCollection = client.getZkStateReader().getClusterState().getCollection(restoreCollectionName);
+    assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
+    //Re-index same docs (should be identical docs given same random seed) and test we have the same result.  Helps
+    //  test we reconstituted the hash ranges / doc router.
+    if (!(restoreCollection.getRouter() instanceof ImplicitDocRouter) && random().nextBoolean()) {
+      indexDocs(restoreCollectionName);
+      assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
+    }
+
+    assertEquals(backupCollection.getReplicationFactor(), restoreCollection.getReplicationFactor());
+    assertEquals(backupCollection.getAutoAddReplicas(), restoreCollection.getAutoAddReplicas());
+    assertEquals(backupCollection.getActiveSlices().iterator().next().getReplicas().size(),
+        restoreCollection.getActiveSlices().iterator().next().getReplicas().size());
+    assertEquals(sameConfig ? "conf1" : "customConfigName",
+        cluster.getSolrClient().getZkStateReader().readConfigName(restoreCollectionName));
+
+    // assert added core properties:
+    // DWS: did via manual inspection.
+    // TODO Find the applicable core.properties on the file system but how?
+  }
+
+  private Map<String, Integer> getShardToDocCountMap(CloudSolrClient client, DocCollection docCollection) throws SolrServerException, IOException {
+    Map<String,Integer> shardToDocCount = new TreeMap<>();
+    for (Slice slice : docCollection.getActiveSlices()) {
+      String shardName = slice.getName();
+      long docsInShard = client.query(docCollection.getName(), new SolrQuery("*:*").setParam(_ROUTE_, shardName))
+          .getResults().getNumFound();
+      shardToDocCount.put(shardName, (int) docsInShard);
+    }
+    return shardToDocCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/test/org/apache/solr/cloud/TestCloudBackupRestore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudBackupRestore.java
deleted file mode 100644
index 5e35616..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudBackupRestore.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.TreeMap;
-
-import org.apache.lucene.util.TestUtil;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.RequestStatusState;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Slice;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.ShardParams._ROUTE_;
-
-public class TestCloudBackupRestore extends SolrCloudTestCase {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
-
-  private static long docsSeed; // see indexDocs()
-
-  @BeforeClass
-  public static void createCluster() throws Exception {
-    configureCluster(2)// nodes
-        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
-        .configure();
-
-    docsSeed = random().nextLong();
-  }
-
-  @Test
-  public void test() throws Exception {
-    String collectionName = "backuprestore";
-    boolean isImplicit = random().nextBoolean();
-    int replFactor = TestUtil.nextInt(random(), 1, 2);
-    CollectionAdminRequest.Create create =
-        CollectionAdminRequest.createCollection(collectionName, "conf1", NUM_SHARDS, replFactor);
-    if (NUM_SHARDS * replFactor > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
-      create.setMaxShardsPerNode(NUM_SHARDS);//just to assert it survives the restoration
-    }
-    if (random().nextBoolean()) {
-      create.setAutoAddReplicas(true);//just to assert it survives the restoration
-    }
-    Properties coreProps = new Properties();
-    coreProps.put("customKey", "customValue");//just to assert it survives the restoration
-    create.setProperties(coreProps);
-    if (isImplicit) { //implicit router
-      create.setRouterName(ImplicitDocRouter.NAME);
-      create.setNumShards(null);//erase it. TODO suggest a new createCollectionWithImplicitRouter method
-      create.setShards("shard1,shard2"); // however still same number as NUM_SHARDS; we assume this later
-      create.setRouterField("shard_s");
-    } else {//composite id router
-      if (random().nextBoolean()) {
-        create.setRouterField("shard_s");
-      }
-    }
-
-    CloudSolrClient solrClient = cluster.getSolrClient();
-    create.process(solrClient);
-
-    indexDocs(collectionName);
-
-    if (!isImplicit && random().nextBoolean()) {
-      // shard split the first shard
-      int prevActiveSliceCount = getActiveSliceCount(collectionName);
-      CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
-      splitShard.setShardName("shard1");
-      splitShard.process(solrClient);
-      // wait until we see one more active slice...
-      for (int i = 0; getActiveSliceCount(collectionName) != prevActiveSliceCount + 1; i++) {
-        assertTrue(i < 30);
-        Thread.sleep(500);
-      }
-      // issue a hard commit.  Split shard does a soft commit which isn't good enough for the backup/snapshooter to see
-      solrClient.commit(collectionName);
-    }
-
-    testBackupAndRestore(collectionName);
-  }
-
-  private int getActiveSliceCount(String collectionName) {
-    return cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName).getActiveSlices().size();
-  }
-
-  private void indexDocs(String collectionName) throws Exception {
-    Random random = new Random(docsSeed);// use a constant seed for the whole test run so that we can easily re-index.
-    int numDocs = random.nextInt(100);
-    if (numDocs == 0) {
-      log.info("Indexing ZERO test docs");
-      return;
-    }
-    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
-    for (int i=0; i<numDocs; i++) {
-      SolrInputDocument doc = new SolrInputDocument();
-      doc.addField("id", i);
-      doc.addField("shard_s", "shard" + (1 + random.nextInt(NUM_SHARDS))); // for implicit router
-      docs.add(doc);
-    }
-    CloudSolrClient client = cluster.getSolrClient();
-    client.add(collectionName, docs);// batch
-    client.commit(collectionName);
-  }
-
-  private void testBackupAndRestore(String collectionName) throws Exception {
-    String backupName = "mytestbackup";
-
-    CloudSolrClient client = cluster.getSolrClient();
-    DocCollection backupCollection = client.getZkStateReader().getClusterState().getCollection(collectionName);
-
-    Map<String, Integer> origShardToDocCount = getShardToDocCountMap(client, backupCollection);
-    assert origShardToDocCount.isEmpty() == false;
-
-    String location = createTempDir().toFile().getAbsolutePath();
-
-    log.info("Triggering Backup command");
-
-    {
-      CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(collectionName, backupName)
-          .setLocation(location);
-      if (random().nextBoolean()) {
-        assertEquals(0, backup.process(client).getStatus());
-      } else {
-        assertEquals(RequestStatusState.COMPLETED, backup.processAndWait(client, 30));//async
-      }
-    }
-
-    log.info("Triggering Restore command");
-
-    String restoreCollectionName = collectionName + "_restored";
-    boolean sameConfig = random().nextBoolean();
-
-    {
-      CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
-              .setLocation(location);
-      if (origShardToDocCount.size() > cluster.getJettySolrRunners().size()) {
-        // may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
-        restore.setMaxShardsPerNode(origShardToDocCount.size());
-      }
-      Properties props = new Properties();
-      props.setProperty("customKey", "customVal");
-      restore.setProperties(props);
-      if (sameConfig==false) {
-        restore.setConfigName("customConfigName");
-      }
-      if (random().nextBoolean()) {
-        assertEquals(0, restore.process(client).getStatus());
-      } else {
-        assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(client, 30));//async
-      }
-      AbstractDistribZkTestBase.waitForRecoveriesToFinish(
-          restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
-    }
-
-    //Check the number of results are the same
-    DocCollection restoreCollection = client.getZkStateReader().getClusterState().getCollection(restoreCollectionName);
-    assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
-    //Re-index same docs (should be identical docs given same random seed) and test we have the same result.  Helps
-    //  test we reconstituted the hash ranges / doc router.
-    if (!(restoreCollection.getRouter() instanceof ImplicitDocRouter) && random().nextBoolean()) {
-      indexDocs(restoreCollectionName);
-      assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
-    }
-
-    assertEquals(backupCollection.getReplicationFactor(), restoreCollection.getReplicationFactor());
-    assertEquals(backupCollection.getAutoAddReplicas(), restoreCollection.getAutoAddReplicas());
-    assertEquals(backupCollection.getActiveSlices().iterator().next().getReplicas().size(),
-        restoreCollection.getActiveSlices().iterator().next().getReplicas().size());
-    assertEquals(sameConfig ? "conf1" : "customConfigName",
-        cluster.getSolrClient().getZkStateReader().readConfigName(restoreCollectionName));
-
-    // assert added core properties:
-    // DWS: did via manual inspection.
-    // TODO Find the applicable core.properties on the file system but how?
-  }
-
-  private Map<String, Integer> getShardToDocCountMap(CloudSolrClient client, DocCollection docCollection) throws SolrServerException, IOException {
-    Map<String,Integer> shardToDocCount = new TreeMap<>();
-    for (Slice slice : docCollection.getActiveSlices()) {
-      String shardName = slice.getName();
-      long docsInShard = client.query(docCollection.getName(), new SolrQuery("*:*").setParam(_ROUTE_, shardName))
-          .getResults().getNumFound();
-      shardToDocCount.put(shardName, (int) docsInShard);
-    }
-    return shardToDocCount;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/test/org/apache/solr/cloud/TestHdfsCloudBackupRestore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestHdfsCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/TestHdfsCloudBackupRestore.java
new file mode 100644
index 0000000..a09fc2f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestHdfsCloudBackupRestore.java
@@ -0,0 +1,148 @@
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.solr.cloud.hdfs.HdfsTestUtil;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This class implements the tests for HDFS integration for Solr backup/restore capability.
+ */
+@ThreadLeakFilters(defaultFilters = true, filters = {
+    BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
+})
+public class TestHdfsCloudBackupRestore extends AbstractCloudBackupRestoreTestCase {
+  public static final String SOLR_XML = "<solr>\n" +
+      "\n" +
+      "  <str name=\"shareSchema\">${shareSchema:false}</str>\n" +
+      "  <str name=\"configSetBaseDir\">${configSetBaseDir:configsets}</str>\n" +
+      "  <str name=\"coreRootDirectory\">${coreRootDirectory:.}</str>\n" +
+      "\n" +
+      "  <shardHandlerFactory name=\"shardHandlerFactory\" class=\"HttpShardHandlerFactory\">\n" +
+      "    <str name=\"urlScheme\">${urlScheme:}</str>\n" +
+      "    <int name=\"socketTimeout\">${socketTimeout:90000}</int>\n" +
+      "    <int name=\"connTimeout\">${connTimeout:15000}</int>\n" +
+      "  </shardHandlerFactory>\n" +
+      "\n" +
+      "  <solrcloud>\n" +
+      "    <str name=\"host\">127.0.0.1</str>\n" +
+      "    <int name=\"hostPort\">${hostPort:8983}</int>\n" +
+      "    <str name=\"hostContext\">${hostContext:solr}</str>\n" +
+      "    <int name=\"zkClientTimeout\">${solr.zkclienttimeout:30000}</int>\n" +
+      "    <bool name=\"genericCoreNodeNames\">${genericCoreNodeNames:true}</bool>\n" +
+      "    <int name=\"leaderVoteWait\">10000</int>\n" +
+      "    <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
+      "    <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
+      "  </solrcloud>\n" +
+      "  \n" +
+      "  <backup>\n" +
+      "    <repository  name=\"hdfs\" class=\"org.apache.solr.core.backup.repository.HdfsBackupRepository\"> \n" +
+      "      <str name=\"location\">${solr.hdfs.default.backup.path}</str>\n" +
+      "      <str name=\"solr.hdfs.home\">${solr.hdfs.home:}</str>\n" +
+      "      <str name=\"solr.hdfs.confdir\">${solr.hdfs.confdir:}</str>\n" +
+      "    </repository>\n" +
+      "  </backup>\n" +
+      "  \n" +
+      "</solr>\n";
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static MiniDFSCluster dfsCluster;
+  private static String hdfsUri;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+    hdfsUri = HdfsTestUtil.getURI(dfsCluster);
+    try {
+      URI uri = new URI(hdfsUri);
+      Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
+      conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+      fs = FileSystem.get(uri, conf);
+
+      if (fs instanceof DistributedFileSystem) {
+        // Make sure dfs is not in safe mode
+        while (((DistributedFileSystem) fs).setSafeMode(SafeModeAction.SAFEMODE_GET, true)) {
+          log.warn("The NameNode is in SafeMode - Solr will wait 5 seconds and try again.");
+          try {
+            Thread.sleep(5000);
+          } catch (InterruptedException e) {
+            Thread.interrupted();
+            // continue
+          }
+        }
+      }
+
+      fs.mkdirs(new org.apache.hadoop.fs.Path("/backup"));
+    } catch (IOException | URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+
+    System.setProperty("solr.hdfs.default.backup.path", "/backup");
+    System.setProperty("solr.hdfs.home", hdfsUri + "/solr");
+    useFactory("solr.StandardDirectoryFactory");
+
+    configureCluster(NUM_SHARDS)// nodes
+    .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+    .withSolrXml(SOLR_XML)
+    .configure();
+  }
+
+  @AfterClass
+  public static void teardownClass() throws Exception {
+    System.clearProperty("solr.hdfs.home");
+    System.clearProperty("solr.hdfs.default.backup.path");
+    System.clearProperty("test.build.data");
+    System.clearProperty("test.cache.data");
+    IOUtils.closeQuietly(fs);
+    fs = null;
+    HdfsTestUtil.teardownClass(dfsCluster);
+    dfsCluster = null;
+  }
+
+  @Override
+  public String getCollectionName() {
+    return "hdfsbackuprestore";
+  }
+
+  @Override
+  public String getBackupRepoName() {
+    return "hdfs";
+  }
+
+  @Override
+  public String getBackupLocation() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/test/org/apache/solr/cloud/TestLocalFSCloudBackupRestore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLocalFSCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/TestLocalFSCloudBackupRestore.java
new file mode 100644
index 0000000..6f3e2bc
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLocalFSCloudBackupRestore.java
@@ -0,0 +1,50 @@
+package org.apache.solr.cloud;
+
+import org.junit.BeforeClass;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This class implements the tests for local file-system integration for Solr backup/restore capability.
+ * Note that the Solr backup/restore still requires a "shared" file-system. Its just that in this case
+ * such file-system would be exposed via local file-system API.
+ */
+public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTestCase {
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    configureCluster(NUM_SHARDS)// nodes
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+  }
+
+  @Override
+  public String getCollectionName() {
+    return "backuprestore";
+  }
+
+  @Override
+  public String getBackupRepoName() {
+    return null;
+  }
+
+  @Override
+  public String getBackupLocation() {
+    return createTempDir().toFile().getAbsolutePath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/core/src/test/org/apache/solr/core/TestBackupRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestBackupRepositoryFactory.java b/solr/core/src/test/org/apache/solr/core/TestBackupRepositoryFactory.java
index 81d3c40..a03d4c4 100644
--- a/solr/core/src/test/org/apache/solr/core/TestBackupRepositoryFactory.java
+++ b/solr/core/src/test/org/apache/solr/core/TestBackupRepositoryFactory.java
@@ -21,9 +21,9 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
@@ -37,8 +37,6 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
 
-import com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule;
-
 public class TestBackupRepositoryFactory extends SolrTestCaseJ4 {
   @Rule
   public TestRule solrTestRules = RuleChain.outerRule(new SystemPropertiesRestoreRule());
@@ -129,7 +127,7 @@ public class TestBackupRepositoryFactory extends SolrTestCaseJ4 {
       attrs.put(CoreAdminParams.NAME, "repo1");
       attrs.put(FieldType.CLASS_NAME, LocalFileSystemRepository.class.getName());
       attrs.put("default" , "true");
-      attrs.put(ZkStateReader.BACKUP_LOCATION, "/tmp");
+      attrs.put("location", "/tmp");
       plugins[0] = new PluginInfo("repository", attrs);
     }
 
@@ -139,14 +137,14 @@ public class TestBackupRepositoryFactory extends SolrTestCaseJ4 {
       BackupRepository repo = f.newInstance(loader);
 
       assertTrue(repo instanceof LocalFileSystemRepository);
-      assertEquals("/tmp", repo.getConfigProperty(ZkStateReader.BACKUP_LOCATION));
+      assertEquals("/tmp", repo.getConfigProperty("location"));
     }
 
     {
       BackupRepository repo = f.newInstance(loader, "repo1");
 
       assertTrue(repo instanceof LocalFileSystemRepository);
-      assertEquals("/tmp", repo.getConfigProperty(ZkStateReader.BACKUP_LOCATION));
+      assertEquals("/tmp", repo.getConfigProperty("location"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 452c7a1..7bc9e4f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.request;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -595,6 +596,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   // BACKUP request
   public static class Backup extends AsyncCollectionSpecificAdminRequest {
     protected final String name;
+    protected Optional<String> repositoryName;
     protected String location;
 
     public Backup(String collection, String name) {
@@ -625,12 +627,24 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       return this;
     }
 
+    public Optional<String> getRepositoryName() {
+      return repositoryName;
+    }
+
+    public Backup setRepositoryName(String repositoryName) {
+      this.repositoryName = Optional.ofNullable(repositoryName);
+      return this;
+    }
+
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       params.set(CoreAdminParams.COLLECTION, collection);
       params.set(CoreAdminParams.NAME, name);
-      params.set("location", location); //note: optional
+      params.set(CoreAdminParams.BACKUP_LOCATION, location); //note: optional
+      if (repositoryName.isPresent()) {
+        params.set(CoreAdminParams.BACKUP_REPOSITORY, repositoryName.get());
+      }
       return params;
     }
 
@@ -643,6 +657,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   // RESTORE request
   public static class Restore extends AsyncCollectionSpecificAdminRequest {
     protected final String backupName;
+    protected Optional<String> repositoryName;
     protected String location;
 
     // in common with collection creation:
@@ -678,6 +693,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       return this;
     }
 
+    public Optional<String> getRepositoryName() {
+      return repositoryName;
+    }
+
+    public Restore setRepositoryName(String repositoryName) {
+      this.repositoryName = Optional.ofNullable(repositoryName);
+      return this;
+    }
+
     // Collection creation params in common:
     public Restore setConfigName(String config) { this.configName = config; return this; }
     public String getConfigName()  { return configName; }
@@ -703,7 +727,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       params.set(CoreAdminParams.COLLECTION, collection);
       params.set(CoreAdminParams.NAME, backupName);
-      params.set("location", location); //note: optional
+      params.set(CoreAdminParams.BACKUP_LOCATION, location); //note: optional
       params.set("collection.configName", configName); //note: optional
       if (maxShardsPerNode != null) {
         params.set( "maxShardsPerNode", maxShardsPerNode);
@@ -717,6 +741,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (properties != null) {
         addProperties(params, properties);
       }
+      if (repositoryName.isPresent()) {
+        params.set(CoreAdminParams.BACKUP_REPOSITORY, repositoryName.get());
+      }
+
       return params;
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 709eebe..f106b9c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.solr.common.Callable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
@@ -106,7 +107,6 @@ public class ZkStateReader implements Closeable {
 
   public static final String URL_SCHEME = "urlScheme";
 
-  public static final String BACKUP_LOCATION = "location";
 
   /** A view of the current state of all collections; combines all the different state sources into a single view. */
   protected volatile ClusterState clusterState;
@@ -160,7 +160,7 @@ public class ZkStateReader implements Closeable {
       LEGACY_CLOUD,
       URL_SCHEME,
       AUTO_ADD_REPLICAS,
-      BACKUP_LOCATION,
+      CoreAdminParams.BACKUP_LOCATION,
       MAX_CORES_PER_NODE)));
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/413ea476/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index 716dfee..7455cbf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -108,6 +108,16 @@ public abstract class CoreAdminParams
   // Node to create a replica on for ADDREPLICA at least.
   public static final String NODE = "node";
 
+  /**
+   * A parameter to specify the name of the backup repository to be used.
+   */
+  public static final String BACKUP_REPOSITORY = "repository";
+
+  /**
+   * A parameter to specify the location where the backup should be stored.
+   */
+  public static final String BACKUP_LOCATION = "location";
+
   public enum CoreAdminAction {
     STATUS(true),
     UNLOAD,