You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/11/30 06:56:22 UTC

[4/8] hbase git commit: HBASE-16904 Snapshot related changes for FS redo work

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index 75a1a17..a56744b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -29,12 +29,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
@@ -44,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
+import org.apache.hadoop.hbase.fs.StorageContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.fs.MasterStorage;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -74,12 +69,10 @@ import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
 import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
 import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.zookeeper.KeeperException;
 
@@ -140,7 +133,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
 
   // Snapshot handlers map, with table name as key.
   // The map is always accessed and modified under the object lock using synchronized.
-  // snapshotTable() will insert an Handler in the table.
+  // initiateSnapshot() will insert an Handler in the table.
   // isSnapshotDone() will remove the handler requested if the operation is finished.
   private Map<TableName, SnapshotSentinel> snapshotHandlers =
       new HashMap<TableName, SnapshotSentinel>();
@@ -154,7 +147,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
   // snapshot using Procedure-V2.
   private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<TableName, Long>();
 
-  private Path rootDir;
   private ExecutorService executorService;
 
   /**
@@ -179,131 +171,65 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
       ProcedureCoordinator coordinator, ExecutorService pool)
       throws IOException, UnsupportedOperationException {
     this.master = master;
-
-    this.rootDir = ((LegacyPathIdentifier) master.getMasterStorage().getRootContainer()).path;
-    checkSnapshotSupport(master.getConfiguration(), master.getMasterStorage());
+    checkSnapshotSupport(master.getConfiguration());
 
     this.coordinator = coordinator;
     this.executorService = pool;
-    resetTempDir();
+    this.master.getMasterStorage().deleteAllSnapshots(StorageContext.TEMP);
   }
 
   /**
    * Gets the list of all completed snapshots.
    * @return list of SnapshotDescriptions
-   * @throws IOException File system exception
+   * @throws IOException Storage exception
    */
   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
-    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
-  }
+    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
 
-  /**
-   * Gets the list of all completed snapshots.
-   * @param snapshotDir snapshot directory
-   * @return list of SnapshotDescriptions
-   * @throws IOException File system exception
-   */
-  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
-    List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
-    // first create the snapshot root path and check to see if it exists
-    FileSystem fs = master.getMasterStorage().getFileSystem();
-    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
-
-    // if there are no snapshots, return an empty list
-    if (!fs.exists(snapshotDir)) {
-      return snapshotDescs;
-    }
-
-    // ignore all the snapshots in progress
-    FileStatus[] snapshots = fs.listStatus(snapshotDir,
-      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
-    // loop through all the completed snapshots
-    for (FileStatus snapshot : snapshots) {
-      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
-      // if the snapshot is bad
-      if (!fs.exists(info)) {
-        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
-        continue;
-      }
-      FSDataInputStream in = null;
-      try {
-        in = fs.open(info);
-        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
-        if (cpHost != null) {
-          try {
-            cpHost.preListSnapshot(desc);
-          } catch (AccessDeniedException e) {
-            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
-                + "Either you should be owner of this snapshot or admin user.");
-            // Skip this and try for next snapshot
-            continue;
+    master.getMasterStorage().visitSnapshots(new MasterStorage.SnapshotVisitor() {
+      @Override
+      public void visitSnapshot(String snapshotName, SnapshotDescription snapshot,
+                                StorageContext ctx) {
+        try {
+          if (cpHost != null) {
+            cpHost.preListSnapshot(snapshot);
           }
-        }
-        snapshotDescs.add(desc);
-
-        // call coproc post hook
-        if (cpHost != null) {
-          cpHost.postListSnapshot(desc);
-        }
-      } catch (IOException e) {
-        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
-      } finally {
-        if (in != null) {
-          in.close();
+          snapshotDescs.add(snapshot);
+        } catch (AccessDeniedException e) {
+          LOG.warn("Current user does not have access to snapshot '" + snapshot.getName() + "'." +
+              " Either you should be owner of this snapshot or admin user.");
+          // Skip this and try for next snapshot
+        } catch (IOException e) {
+          LOG.warn("Error while checking access permissions for snapshot '" + snapshot.getName()
+              +  "'.", e);
         }
       }
-    }
-    return snapshotDescs;
-  }
+    });
 
-  /**
-   * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
-   * snapshot attempts.
-   *
-   * @throws IOException if we can't reach the filesystem
-   */
-  void resetTempDir() throws IOException {
-    // cleanup any existing snapshots.
-    Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
-    if (master.getMasterStorage().getFileSystem().exists(tmpdir)) {
-      if (!master.getMasterStorage().getFileSystem().delete(tmpdir, true)) {
-        LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
-      }
-    }
+    return snapshotDescs;
   }
 
   /**
    * Delete the specified snapshot
-   * @param snapshot
+   * @param snapshot {@link SnapshotDescription}
    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
-   * @throws IOException For filesystem IOExceptions
+   * @throws IOException For storage IOExceptions
    */
-  public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
-    // check to see if it is completed
-    if (!isSnapshotCompleted(snapshot)) {
+  public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException,
+      IOException {
+    if (!master.getMasterStorage().snapshotExists(snapshot)) {
       throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
     }
 
-    String snapshotName = snapshot.getName();
-    // first create the snapshot description and check to see if it exists
-    FileSystem fs = master.getMasterStorage().getFileSystem();
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
-    // just the "name" and it does not contains the "real" snapshot information
-    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-
     // call coproc pre hook
     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.preDeleteSnapshot(snapshot);
     }
 
-    LOG.debug("Deleting snapshot: " + snapshotName);
-    // delete the existing snapshot
-    if (!fs.delete(snapshotDir, true)) {
-      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
-    }
+    // delete snapshot from storage
+    master.getMasterStorage().deleteSnapshot(snapshot);
 
     // call coproc post hook
     if (cpHost != null) {
@@ -343,7 +269,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
       //   - the snapshot was never requested
       // In those cases returns to the user the "done state" if the snapshots exists on disk,
       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
-      if (!isSnapshotCompleted(expected)) {
+      if (!master.getMasterStorage().snapshotExists(expected)) {
         throw new UnknownSnapshotException("Snapshot " + ssString
             + " is not currently running or one of the known completed snapshots.");
       }
@@ -416,12 +342,10 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
    * aren't already running a snapshot or restore on the requested table.
    * @param snapshot description of the snapshot we want to start
-   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
+   * @throws HBaseSnapshotException if the storage could not be prepared to start the snapshot
    */
   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
       throws HBaseSnapshotException {
-    FileSystem fs = master.getMasterStorage().getFileSystem();
-    Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
     TableName snapshotTable =
         TableName.valueOf(snapshot.getTable());
 
@@ -444,22 +368,12 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
     }
 
     try {
-      // delete the working directory, since we aren't running the snapshot. Likely leftovers
-      // from a failed attempt.
-      fs.delete(workingDir, true);
-
-      // recreate the working directory for the snapshot
-      if (!fs.mkdirs(workingDir)) {
-        throw new SnapshotCreationException(
-            "Couldn't create working directory (" + workingDir + ") for snapshot",
-            ProtobufUtil.createSnapshotDesc(snapshot));
-      }
+      master.getMasterStorage().prepareSnapshot(snapshot);
     } catch (HBaseSnapshotException e) {
       throw e;
     } catch (IOException e) {
-      throw new SnapshotCreationException(
-          "Exception while checking to see if snapshot could be started.", e,
-          ProtobufUtil.createSnapshotDesc(snapshot));
+      throw new SnapshotCreationException("Exception while checking to see if snapshot could be " +
+          "started.", e, ProtobufUtil.createSnapshotDesc(snapshot));
     }
   }
 
@@ -513,15 +427,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
       this.executorService.submit(handler);
       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
     } catch (Exception e) {
-      // cleanup the working directory by trying to delete it from the fs.
-      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
+      // cleanup the storage by trying to delete it.
       try {
-        if (!this.master.getMasterStorage().getFileSystem().delete(workingDir, true)) {
-          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
-              ClientSnapshotDescriptionUtils.toString(snapshot));
-        }
+        master.getMasterStorage().deleteSnapshot(snapshot);
       } catch (IOException e1) {
-        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
+        LOG.error("Couldn't delete in-progress snapshot:" +
             ClientSnapshotDescriptionUtils.toString(snapshot));
       }
       // fail the snapshot
@@ -539,10 +449,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    */
   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
     // check to see if we already completed the snapshot
-    if (isSnapshotCompleted(snapshot)) {
-      throw new SnapshotExistsException(
-          "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
-          ProtobufUtil.createSnapshotDesc(snapshot));
+    if (master.getMasterStorage().snapshotExists(snapshot)) {
+      throw new SnapshotExistsException("Snapshot '" + snapshot.getName() +
+          "' already stored on the storage.", ProtobufUtil.createSnapshotDesc(snapshot));
     }
 
     LOG.debug("No existing snapshot, attempting snapshot...");
@@ -642,27 +551,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
   }
 
   /**
-   * Check to see if the snapshot is one of the currently completed snapshots
-   * Returns true if the snapshot exists in the "completed snapshots folder".
-   *
-   * @param snapshot expected snapshot to check
-   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
-   *         not stored
-   * @throws IOException if the filesystem throws an unexpected exception,
-   * @throws IllegalArgumentException if snapshot name is invalid.
-   */
-  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
-    try {
-      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
-      FileSystem fs = master.getMasterStorage().getFileSystem();
-      // check to see if the snapshot already exists
-      return fs.exists(snapshotDir);
-    } catch (IllegalArgumentException iae) {
-      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
-    }
-  }
-
-  /**
    * Clone the specified snapshot.
    * The clone will fail if the destination table has a snapshot or restore in progress.
    *
@@ -753,34 +641,29 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    * @param nonce unique value to prevent duplicated RPC
    * @throws IOException
    */
-  public long restoreOrCloneSnapshot(
-      SnapshotDescription reqSnapshot,
-      final long nonceGroup,
+  public long restoreOrCloneSnapshot(SnapshotDescription reqSnapshot, final long nonceGroup,
       final long nonce) throws IOException {
-    FileSystem fs = master.getMasterStorage().getFileSystem();
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
-
     // check if the snapshot exists
-    if (!fs.exists(snapshotDir)) {
-      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
-      throw new SnapshotDoesNotExistException(
-        ProtobufUtil.createSnapshotDesc(reqSnapshot));
+    if (!master.getMasterStorage().snapshotExists(reqSnapshot)) {
+      LOG.error("A Snapshot '" + reqSnapshot.getName() + "' does not exist.");
+      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
     }
 
-    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
+    // Get snapshot info from storage. The reqSnapshot is a "fake" snapshotInfo with
     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
     // information.
-    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
-        snapshotDir, snapshot);
-    HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
+    SnapshotDescription snapshot =
+        master.getMasterStorage().getSnapshot(reqSnapshot.getName());
+
+    HTableDescriptor snapshotTableDesc =
+        master.getMasterStorage().getTableDescriptorForSnapshot(snapshot);
     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
 
     // stop tracking "abandoned" handlers
     cleanupSentinels();
 
     // Verify snapshot validity
-    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
+    SnapshotReferenceUtil.verifySnapshot(master.getMasterStorage(), snapshot, StorageContext.DATA);
 
     // Execute the restore/clone operation
     long procId;
@@ -1037,36 +920,25 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    * starting the master if there're snapshots present but the cleaners needed are missing.
    * Otherwise we can end up with snapshot data loss.
    * @param conf The {@link Configuration} object to use
-   * @param ms The MasterFileSystem to use
-   * @throws IOException in case of file-system operation failure
+   * @throws IOException in case of storage operation failure
    * @throws UnsupportedOperationException in case cleaners are missing and
    *         there're snapshot in the system
    */
-  private void checkSnapshotSupport(final Configuration conf, final MasterStorage ms)
+  private void checkSnapshotSupport(final Configuration conf)
       throws IOException, UnsupportedOperationException {
     // Verify if snapshot is disabled by the user
     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
 
-    // check if an older version of snapshot directory was present
-    Path oldSnapshotDir = new Path(((LegacyPathIdentifier) ms.getRootContainer()).path, HConstants
-        .OLD_SNAPSHOT_DIR_NAME);
-    FileSystem fs = ms.getFileSystem();
-    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
-    if (ss != null && !ss.isEmpty()) {
-      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
-      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
-    }
-
     // If the user has enabled the snapshot, we force the cleaners to be present
     // otherwise we still need to check if cleaners are enabled or not and verify
     // that there're no snapshot in the .snapshot folder.
     if (snapshotEnabled) {
-      ms.enableSnapshots();
+      master.getMasterStorage().enableSnapshots();
     } else {
       // Verify if cleaners are present
-      snapshotEnabled = ms.isSnapshotsEnabled();
+      snapshotEnabled = master.getMasterStorage().isSnapshotsEnabled();
 
       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
       if (snapshotEnabled) {
@@ -1083,15 +955,10 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
     // otherwise we end up with snapshot data loss.
     if (!snapshotEnabled) {
       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
-      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(((LegacyPathIdentifier) ms
-          .getRootContainer()).path);
-      if (fs.exists(snapshotDir)) {
-        FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
-          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
-        if (snapshots != null) {
+      List<SnapshotDescription> snapshots = master.getMasterStorage().getSnapshots();
+      if (snapshots != null && !snapshots.isEmpty()) {
           LOG.error("Snapshots are present, but cleaners are not enabled.");
           checkSnapshotSupport();
-        }
       }
     }
   }
@@ -1100,9 +967,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
       IOException, UnsupportedOperationException {
     this.master = master;
-
-    this.rootDir = ((LegacyPathIdentifier) master.getMasterStorage().getRootContainer()).path;
-    checkSnapshotSupport(master.getConfiguration(), master.getMasterStorage());
+    checkSnapshotSupport(master.getConfiguration());
 
     // get the configuration for the coordinator
     Configuration conf = master.getConfiguration();
@@ -1121,7 +986,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
 
     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
     this.executorService = master.getExecutorService();
-    resetTempDir();
+    this.master.getMasterStorage().deleteAllSnapshots(StorageContext.TEMP);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index 503f346..e166061 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -29,8 +29,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -41,7 +39,9 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageContext;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.MetricsSnapshot;
 import org.apache.hadoop.hbase.master.SnapshotSentinel;
@@ -52,9 +52,6 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.zookeeper.KeeperException;
@@ -78,17 +75,13 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
   protected final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
   protected final SnapshotDescription snapshot;
   protected final Configuration conf;
-  protected final FileSystem fs;
-  protected final Path rootDir;
-  private final Path snapshotDir;
-  protected final Path workingDir;
+  protected final MasterStorage<? extends StorageIdentifier> masterStorage;
   private final MasterSnapshotVerifier verifier;
   protected final ForeignExceptionDispatcher monitor;
   protected final TableLockManager tableLockManager;
   protected final TableLock tableLock;
   protected final MonitoredTask status;
   protected final TableName snapshotTable;
-  protected final SnapshotManifest snapshotManifest;
   protected final SnapshotManager snapshotManager;
 
   protected HTableDescriptor htd;
@@ -108,12 +101,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
     this.snapshotManager = snapshotManager;
     this.snapshotTable = TableName.valueOf(snapshot.getTable());
     this.conf = this.master.getConfiguration();
-    this.fs = this.master.getMasterStorage().getFileSystem();
-    this.rootDir = ((LegacyPathIdentifier) this.master.getMasterStorage().getRootContainer()).path;
-    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
-    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
+    this.masterStorage = masterServices.getMasterStorage();
     this.monitor = new ForeignExceptionDispatcher(snapshot.getName());
-    this.snapshotManifest = SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor);
 
     this.tableLockManager = master.getTableLockManager();
     this.tableLock = this.tableLockManager.writeLock(
@@ -121,7 +110,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
         EventType.C_M_SNAPSHOT_TABLE.toString());
 
     // prepare the verify
-    this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir);
+    this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, StorageContext.TEMP);
     // update the running tasks
     this.status = TaskMonitor.get().createStatus(
       "Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
@@ -129,8 +118,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
 
   private HTableDescriptor loadTableDescriptor()
       throws FileNotFoundException, IOException {
-    HTableDescriptor htd =
-      this.master.getTableDescriptors().get(snapshotTable);
+    HTableDescriptor htd = this.master.getTableDescriptors().get(snapshotTable);
     if (htd == null) {
       throw new IOException("HTableDescriptor missing for " + snapshotTable);
     }
@@ -171,9 +159,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
       // If regions move after this meta scan, the region specific snapshot should fail, triggering
       // an external exception that gets captured here.
 
-      // write down the snapshot info in the working directory
-      SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs);
-      snapshotManifest.addTableDescriptor(this.htd);
+      // initiate snapshot on storage
+      masterStorage.initiateSnapshot(htd, snapshot, monitor, StorageContext.TEMP);
       monitor.rethrowException();
 
       List<Pair<HRegionInfo, ServerName>> regionsAndLocations;
@@ -201,14 +188,14 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
 
       // flush the in-memory state, and write the single manifest
       status.setStatus("Consolidate snapshot: " + snapshot.getName());
-      snapshotManifest.consolidate();
+      masterStorage.consolidateSnapshot(snapshot, StorageContext.TEMP);
 
       // verify the snapshot is valid
       status.setStatus("Verifying snapshot: " + snapshot.getName());
-      verifier.verifySnapshot(this.workingDir, serverNames);
+      verifier.verifySnapshot(StorageContext.TEMP);
 
-      // complete the snapshot, atomically moving from tmp to .snapshot dir.
-      completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
+      // complete the snapshot, atomically moving from TEMP storage context to DATA context.
+      completeSnapshot(snapshot);
       msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed";
       status.markComplete(msg);
       LOG.info(msg);
@@ -224,15 +211,19 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
       // need to mark this completed to close off and allow cleanup to happen.
       cancel(reason);
     } finally {
-      LOG.debug("Launching cleanup of working dir:" + workingDir);
+      LOG.debug("Launching cleanup of artifacts in TEMP storage context for a snapshot '" +
+          snapshot.getName() + "'.");
       try {
-        // if the working dir is still present, the snapshot has failed.  it is present we delete
-        // it.
-        if (fs.exists(workingDir) && !this.fs.delete(workingDir, true)) {
-          LOG.error("Couldn't delete snapshot working directory:" + workingDir);
+        // If the snapshot is still present in TEMP storage context, the snapshot has failed.
+        // Delete it.
+        if (masterStorage.snapshotExists(snapshot, StorageContext.TEMP) &&
+            !masterStorage.deleteSnapshot(snapshot, StorageContext.TEMP)) {
+          LOG.error("Couldn't delete snapshot '" + snapshot.getName() +
+              "' from TEMP storage context.");
         }
       } catch (IOException e) {
-        LOG.error("Couldn't delete snapshot working directory:" + workingDir);
+        LOG.error("Couldn't delete snapshot '" + snapshot.getName() +
+            "' from TEMP storage context.");
       }
       lock.unlock();
       releaseTableLock();
@@ -252,19 +243,17 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
   /**
    * Reset the manager to allow another snapshot to proceed
    *
-   * @param snapshotDir final path of the snapshot
-   * @param workingDir directory where the in progress snapshot was built
-   * @param fs {@link FileSystem} where the snapshot was built
+   * @param snapshot
    * @throws SnapshotCreationException if the snapshot could not be moved
-   * @throws IOException the filesystem could not be reached
+   * @throws IOException the storage could not be reached
    */
-  public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs)
-      throws SnapshotCreationException, IOException {
-    LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
-        + snapshotDir);
-    if (!fs.rename(workingDir, snapshotDir)) {
-      throw new SnapshotCreationException("Failed to move working directory(" + workingDir
-          + ") to completed directory(" + snapshotDir + ").");
+  public void completeSnapshot(SnapshotDescription snapshot) throws SnapshotCreationException,
+      IOException {
+    LOG.debug("Sentinel is done, just moving the snapshot '" + snapshot.getName() +
+        "' from TEMP storae context to DATA context.");
+    if (!masterStorage.changeSnapshotContext(snapshot, StorageContext.TEMP, StorageContext.DATA)) {
+      throw new SnapshotCreationException("Failed to move snapshot '" + snapshot.getName() +
+          "' from TEMP storage context to DATA context.");
     }
     finished = true;
   }
@@ -278,9 +267,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
   /**
    * Take a snapshot of the specified disabled region
    */
-  protected void snapshotDisabledRegion(final HRegionInfo regionInfo)
-      throws IOException {
-    snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo);
+  protected void snapshotDisabledRegion(final HRegionInfo regionInfo) throws IOException {
+    masterStorage.addRegionToSnapshot(snapshot, regionInfo, StorageContext.TEMP);
     monitor.rethrowException();
     status.setStatus("Completed referencing HFiles for offline region " + regionInfo.toString() +
         " of table: " + snapshotTable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1c06186..1964841 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -133,8 +133,10 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.fs.MasterStorage;
 import org.apache.hadoop.hbase.fs.RegionStorage;
 import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.LegacyLayout;
 import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -174,8 +176,7 @@ import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.fs.legacy.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -3664,7 +3665,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public void addRegionToSnapshot(SnapshotDescription desc,
       ForeignExceptionSnare exnSnare) throws IOException {
     Path rootDir = FSUtils.getRootDir(conf);
-    Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
+    Path snapshotDir = LegacyLayout.getWorkingSnapshotDir(rootDir, desc);
 
     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
             snapshotDir, desc, exnSnare);
@@ -6426,17 +6427,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return r.openHRegion(reporter);
   }
 
-  /**
-   * TODO remove after refactoring TableSnapshotScanner and TableSnapshotInputFormatImpl to use a RegionStorage impl instead of specifying a different root dir manually.
-   */
-  public static HRegion openHRegion(final FileSystem fs, final Path rootDir, final HRegionInfo info,
-      HTableDescriptor htd, Configuration conf) throws IOException {
+  public static HRegion openHRegion(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      final HRegionInfo info, HTableDescriptor htd) throws IOException {
     if (info == null) throw new IllegalArgumentException("Passed region info is null");
     if (LOG.isDebugEnabled()) {
       LOG.debug("Opening region: " + info);
     }
-    RegionStorage rfs = RegionStorage.open(conf, fs, new LegacyPathIdentifier(rootDir), info, false);
-    HRegion r = newHRegion(rfs, htd, null, null);
+    RegionStorage<? extends StorageIdentifier> regionStorage = masterStorage.getRegionStorage(info);
+    HRegion r = newHRegion(regionStorage, htd, null, null);
     return r.openHRegion(null);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 5305149..b8750a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -32,13 +31,11 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.io.WALLink;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.replication.*;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
-import org.mortbay.util.IO;
 
 import java.io.IOException;
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
deleted file mode 100644
index 01548cc..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ /dev/null
@@ -1,1084 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.snapshot;
-
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.fs.legacy.io.FileLink;
-import org.apache.hadoop.hbase.fs.legacy.io.HFileLink;
-import org.apache.hadoop.hbase.io.WALLink;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * Export the specified snapshot to a given FileSystem.
- *
- * The .snapshot/name folder is copied to the destination cluster
- * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
- * When everything is done, the second cluster can restore the snapshot.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class ExportSnapshot extends Configured implements Tool {
-  public static final String NAME = "exportsnapshot";
-  /** Configuration prefix for overrides for the source filesystem */
-  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
-  /** Configuration prefix for overrides for the destination filesystem */
-  public static final String CONF_DEST_PREFIX = NAME + ".to.";
-
-  private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
-
-  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
-  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
-  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
-  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
-  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
-  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
-  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
-  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
-  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
-  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
-  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
-  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
-  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
-  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
-
-  static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
-  static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
-
-  // Export Map-Reduce Counters, to keep track of the progress
-  public enum Counter {
-    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
-    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
-  }
-
-  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
-                                                   NullWritable, NullWritable> {
-    final static int REPORT_SIZE = 1 * 1024 * 1024;
-    final static int BUFFER_SIZE = 64 * 1024;
-
-    private boolean testFailures;
-    private Random random;
-
-    private boolean verifyChecksum;
-    private String filesGroup;
-    private String filesUser;
-    private short filesMode;
-    private int bufferSize;
-
-    private FileSystem outputFs;
-    private Path outputArchive;
-    private Path outputRoot;
-
-    private FileSystem inputFs;
-    private Path inputArchive;
-    private Path inputRoot;
-
-    @Override
-    public void setup(Context context) throws IOException {
-      Configuration conf = context.getConfiguration();
-      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
-      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
-
-      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
-
-      filesGroup = conf.get(CONF_FILES_GROUP);
-      filesUser = conf.get(CONF_FILES_USER);
-      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
-      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
-      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
-
-      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
-      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
-
-      testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
-
-      try {
-        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
-        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
-      } catch (IOException e) {
-        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
-      }
-
-      try {
-        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
-        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
-      } catch (IOException e) {
-        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
-      }
-
-      // Use the default block size of the outputFs if bigger
-      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
-      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
-      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
-
-      for (Counter c : Counter.values()) {
-        context.getCounter(c).increment(0);
-      }
-    }
-
-    @Override
-    protected void cleanup(Context context) {
-      IOUtils.closeStream(inputFs);
-      IOUtils.closeStream(outputFs);
-    }
-
-    @Override
-    public void map(BytesWritable key, NullWritable value, Context context)
-        throws InterruptedException, IOException {
-      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
-      Path outputPath = getOutputPath(inputInfo);
-
-      copyFile(context, inputInfo, outputPath);
-    }
-
-    /**
-     * Returns the location where the inputPath will be copied.
-     */
-    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
-      Path path = null;
-      switch (inputInfo.getType()) {
-        case HFILE:
-          Path inputPath = new Path(inputInfo.getHfile());
-          String family = inputPath.getParent().getName();
-          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
-          String region = HFileLink.getReferencedRegionName(inputPath.getName());
-          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
-          path = new Path(FSUtils.getTableDir(new Path("./"), table),
-              new Path(region, new Path(family, hfile)));
-          break;
-        case WAL:
-          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
-          break;
-        default:
-          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
-      }
-      return new Path(outputArchive, path);
-    }
-
-    /*
-     * Used by TestExportSnapshot to simulate a failure
-     */
-    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
-        throws IOException {
-      if (testFailures) {
-        if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
-          if (random == null) {
-            random = new Random();
-          }
-
-          // FLAKY-TEST-WARN: lower is better, we can get some runs without the
-          // retry, but at least we reduce the number of test failures due to
-          // this test exception from the same map task.
-          if (random.nextFloat() < 0.03) {
-            throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
-                                  + " time=" + System.currentTimeMillis());
-          }
-        } else {
-          context.getCounter(Counter.COPY_FAILED).increment(1);
-          throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
-        }
-      }
-    }
-
-    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
-        final Path outputPath) throws IOException {
-      injectTestFailure(context, inputInfo);
-
-      // Get the file information
-      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
-
-      // Verify if the output file exists and is the same that we want to copy
-      if (outputFs.exists(outputPath)) {
-        FileStatus outputStat = outputFs.getFileStatus(outputPath);
-        if (outputStat != null && sameFile(inputStat, outputStat)) {
-          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
-          context.getCounter(Counter.FILES_SKIPPED).increment(1);
-          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
-          return;
-        }
-      }
-
-      InputStream in = openSourceFile(context, inputInfo);
-      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
-      if (Integer.MAX_VALUE != bandwidthMB) {
-        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
-      }
-
-      try {
-        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
-
-        // Ensure that the output folder is there and copy the file
-        createOutputPath(outputPath.getParent());
-        FSDataOutputStream out = outputFs.create(outputPath, true);
-        try {
-          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
-        } finally {
-          out.close();
-        }
-
-        // Try to Preserve attributes
-        if (!preserveAttributes(outputPath, inputStat)) {
-          LOG.warn("You may have to run manually chown on: " + outputPath);
-        }
-      } finally {
-        in.close();
-      }
-    }
-
-    /**
-     * Create the output folder and optionally set ownership.
-     */
-    private void createOutputPath(final Path path) throws IOException {
-      if (filesUser == null && filesGroup == null) {
-        outputFs.mkdirs(path);
-      } else {
-        Path parent = path.getParent();
-        if (!outputFs.exists(parent) && !parent.isRoot()) {
-          createOutputPath(parent);
-        }
-        outputFs.mkdirs(path);
-        if (filesUser != null || filesGroup != null) {
-          // override the owner when non-null user/group is specified
-          outputFs.setOwner(path, filesUser, filesGroup);
-        }
-        if (filesMode > 0) {
-          outputFs.setPermission(path, new FsPermission(filesMode));
-        }
-      }
-    }
-
-    /**
-     * Try to Preserve the files attribute selected by the user copying them from the source file
-     * This is only required when you are exporting as a different user than "hbase" or on a system
-     * that doesn't have the "hbase" user.
-     *
-     * This is not considered a blocking failure since the user can force a chmod with the user
-     * that knows is available on the system.
-     */
-    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
-      FileStatus stat;
-      try {
-        stat = outputFs.getFileStatus(path);
-      } catch (IOException e) {
-        LOG.warn("Unable to get the status for file=" + path);
-        return false;
-      }
-
-      try {
-        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
-          outputFs.setPermission(path, new FsPermission(filesMode));
-        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
-          outputFs.setPermission(path, refStat.getPermission());
-        }
-      } catch (IOException e) {
-        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
-        return false;
-      }
-
-      boolean hasRefStat = (refStat != null);
-      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
-      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
-      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
-        try {
-          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
-            outputFs.setOwner(path, user, group);
-          }
-        } catch (IOException e) {
-          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
-          LOG.warn("The user/group may not exist on the destination cluster: user=" +
-                   user + " group=" + group);
-          return false;
-        }
-      }
-
-      return true;
-    }
-
-    private boolean stringIsNotEmpty(final String str) {
-      return str != null && str.length() > 0;
-    }
-
-    private void copyData(final Context context,
-        final Path inputPath, final InputStream in,
-        final Path outputPath, final FSDataOutputStream out,
-        final long inputFileSize)
-        throws IOException {
-      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
-                                   " (%.1f%%)";
-
-      try {
-        byte[] buffer = new byte[bufferSize];
-        long totalBytesWritten = 0;
-        int reportBytes = 0;
-        int bytesRead;
-
-        long stime = System.currentTimeMillis();
-        while ((bytesRead = in.read(buffer)) > 0) {
-          out.write(buffer, 0, bytesRead);
-          totalBytesWritten += bytesRead;
-          reportBytes += bytesRead;
-
-          if (reportBytes >= REPORT_SIZE) {
-            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
-            context.setStatus(String.format(statusMessage,
-                              StringUtils.humanReadableInt(totalBytesWritten),
-                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
-                              " from " + inputPath + " to " + outputPath);
-            reportBytes = 0;
-          }
-        }
-        long etime = System.currentTimeMillis();
-
-        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
-        context.setStatus(String.format(statusMessage,
-                          StringUtils.humanReadableInt(totalBytesWritten),
-                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
-                          " from " + inputPath + " to " + outputPath);
-
-        // Verify that the written size match
-        if (totalBytesWritten != inputFileSize) {
-          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
-                       " expected=" + inputFileSize + " for file=" + inputPath;
-          throw new IOException(msg);
-        }
-
-        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
-        LOG.info("size=" + totalBytesWritten +
-            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
-            " time=" + StringUtils.formatTimeDiff(etime, stime) +
-            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
-        context.getCounter(Counter.FILES_COPIED).increment(1);
-      } catch (IOException e) {
-        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
-        context.getCounter(Counter.COPY_FAILED).increment(1);
-        throw e;
-      }
-    }
-
-    /**
-     * Try to open the "source" file.
-     * Throws an IOException if the communication with the inputFs fail or
-     * if the file is not found.
-     */
-    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
-            throws IOException {
-      try {
-        Configuration conf = context.getConfiguration();
-        FileLink link = null;
-        switch (fileInfo.getType()) {
-          case HFILE:
-            Path inputPath = new Path(fileInfo.getHfile());
-            link = getFileLink(inputPath, conf);
-            break;
-          case WAL:
-            String serverName = fileInfo.getWalServer();
-            String logName = fileInfo.getWalName();
-            link = new WALLink(inputRoot, serverName, logName);
-            break;
-          default:
-            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
-        }
-        return link.open(inputFs);
-      } catch (IOException e) {
-        context.getCounter(Counter.MISSING_FILES).increment(1);
-        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
-        throw e;
-      }
-    }
-
-    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
-        throws IOException {
-      try {
-        Configuration conf = context.getConfiguration();
-        FileLink link = null;
-        switch (fileInfo.getType()) {
-          case HFILE:
-            Path inputPath = new Path(fileInfo.getHfile());
-            link = getFileLink(inputPath, conf);
-            break;
-          case WAL:
-            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
-            break;
-          default:
-            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
-        }
-        return link.getFileStatus(inputFs);
-      } catch (FileNotFoundException e) {
-        context.getCounter(Counter.MISSING_FILES).increment(1);
-        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
-        throw e;
-      } catch (IOException e) {
-        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
-        throw e;
-      }
-    }
-
-    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
-      String regionName = HFileLink.getReferencedRegionName(path.getName());
-      TableName tableName = HFileLink.getReferencedTableName(path.getName());
-      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
-        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
-                HFileArchiveUtil.getArchivePath(conf), path);
-      }
-      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
-    }
-
-    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
-      try {
-        return fs.getFileChecksum(path);
-      } catch (IOException e) {
-        LOG.warn("Unable to get checksum for file=" + path, e);
-        return null;
-      }
-    }
-
-    /**
-     * Check if the two files are equal by looking at the file length,
-     * and at the checksum (if user has specified the verifyChecksum flag).
-     */
-    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
-      // Not matching length
-      if (inputStat.getLen() != outputStat.getLen()) return false;
-
-      // Mark files as equals, since user asked for no checksum verification
-      if (!verifyChecksum) return true;
-
-      // If checksums are not available, files are not the same.
-      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
-      if (inChecksum == null) return false;
-
-      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
-      if (outChecksum == null) return false;
-
-      return inChecksum.equals(outChecksum);
-    }
-  }
-
-  // ==========================================================================
-  //  Input Format
-  // ==========================================================================
-
-  /**
-   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
-   * @return list of files referenced by the snapshot (pair of path and size)
-   */
-  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
-      final FileSystem fs, final Path snapshotDir) throws IOException {
-    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-
-    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
-    final TableName table = TableName.valueOf(snapshotDesc.getTable());
-
-    // Get snapshot files
-    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
-    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
-      new SnapshotReferenceUtil.SnapshotVisitor() {
-        @Override
-        public void storeFile(final HRegionInfo regionInfo, final String family,
-            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
-          // for storeFile.hasReference() case, copied as part of the manifest
-          if (!storeFile.hasReference()) {
-            String region = regionInfo.getEncodedName();
-            String hfile = storeFile.getName();
-            Path path = HFileLink.createPath(table, region, family, hfile);
-
-            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
-              .setType(SnapshotFileInfo.Type.HFILE)
-              .setHfile(path.toString())
-              .build();
-
-            long size;
-            if (storeFile.hasFileSize()) {
-              size = storeFile.getFileSize();
-            } else {
-              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
-            }
-            files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
-          }
-        }
-    });
-
-    return files;
-  }
-
-  /**
-   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
-   * The groups created will have similar amounts of bytes.
-   * <p>
-   * The algorithm used is pretty straightforward; the file list is sorted by size,
-   * and then each group fetch the bigger file available, iterating through groups
-   * alternating the direction.
-   */
-  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
-      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
-    // Sort files by size, from small to big
-    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
-      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
-        long r = a.getSecond() - b.getSecond();
-        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
-      }
-    });
-
-    // create balanced groups
-    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
-      new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
-    long[] sizeGroups = new long[ngroups];
-    int hi = files.size() - 1;
-    int lo = 0;
-
-    List<Pair<SnapshotFileInfo, Long>> group;
-    int dir = 1;
-    int g = 0;
-
-    while (hi >= lo) {
-      if (g == fileGroups.size()) {
-        group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
-        fileGroups.add(group);
-      } else {
-        group = fileGroups.get(g);
-      }
-
-      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
-
-      // add the hi one
-      sizeGroups[g] += fileInfo.getSecond();
-      group.add(fileInfo);
-
-      // change direction when at the end or the beginning
-      g += dir;
-      if (g == ngroups) {
-        dir = -1;
-        g = ngroups - 1;
-      } else if (g < 0) {
-        dir = 1;
-        g = 0;
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      for (int i = 0; i < sizeGroups.length; ++i) {
-        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
-      }
-    }
-
-    return fileGroups;
-  }
-
-  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
-    @Override
-    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
-        TaskAttemptContext tac) throws IOException, InterruptedException {
-      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
-    }
-
-    @Override
-    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-      Configuration conf = context.getConfiguration();
-      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
-      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
-
-      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
-      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
-      if (mappers == 0 && snapshotFiles.size() > 0) {
-        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
-        mappers = Math.min(mappers, snapshotFiles.size());
-        conf.setInt(CONF_NUM_SPLITS, mappers);
-        conf.setInt(MR_NUM_MAPS, mappers);
-      }
-
-      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
-      List<InputSplit> splits = new ArrayList(groups.size());
-      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
-        splits.add(new ExportSnapshotInputSplit(files));
-      }
-      return splits;
-    }
-
-    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
-      private List<Pair<BytesWritable, Long>> files;
-      private long length;
-
-      public ExportSnapshotInputSplit() {
-        this.files = null;
-      }
-
-      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
-        this.files = new ArrayList(snapshotFiles.size());
-        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
-          this.files.add(new Pair<BytesWritable, Long>(
-            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
-          this.length += fileInfo.getSecond();
-        }
-      }
-
-      private List<Pair<BytesWritable, Long>> getSplitKeys() {
-        return files;
-      }
-
-      @Override
-      public long getLength() throws IOException, InterruptedException {
-        return length;
-      }
-
-      @Override
-      public String[] getLocations() throws IOException, InterruptedException {
-        return new String[] {};
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        int count = in.readInt();
-        files = new ArrayList<Pair<BytesWritable, Long>>(count);
-        length = 0;
-        for (int i = 0; i < count; ++i) {
-          BytesWritable fileInfo = new BytesWritable();
-          fileInfo.readFields(in);
-          long size = in.readLong();
-          files.add(new Pair<BytesWritable, Long>(fileInfo, size));
-          length += size;
-        }
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        out.writeInt(files.size());
-        for (final Pair<BytesWritable, Long> fileInfo: files) {
-          fileInfo.getFirst().write(out);
-          out.writeLong(fileInfo.getSecond());
-        }
-      }
-    }
-
-    private static class ExportSnapshotRecordReader
-        extends RecordReader<BytesWritable, NullWritable> {
-      private final List<Pair<BytesWritable, Long>> files;
-      private long totalSize = 0;
-      private long procSize = 0;
-      private int index = -1;
-
-      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
-        this.files = files;
-        for (Pair<BytesWritable, Long> fileInfo: files) {
-          totalSize += fileInfo.getSecond();
-        }
-      }
-
-      @Override
-      public void close() { }
-
-      @Override
-      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
-
-      @Override
-      public NullWritable getCurrentValue() { return NullWritable.get(); }
-
-      @Override
-      public float getProgress() { return (float)procSize / totalSize; }
-
-      @Override
-      public void initialize(InputSplit split, TaskAttemptContext tac) { }
-
-      @Override
-      public boolean nextKeyValue() {
-        if (index >= 0) {
-          procSize += files.get(index).getSecond();
-        }
-        return(++index < files.size());
-      }
-    }
-  }
-
-  // ==========================================================================
-  //  Tool
-  // ==========================================================================
-
-  /**
-   * Run Map-Reduce Job to perform the files copy.
-   */
-  private void runCopyJob(final Path inputRoot, final Path outputRoot,
-      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
-      final String filesUser, final String filesGroup, final int filesMode,
-      final int mappers, final int bandwidthMB)
-          throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration conf = getConf();
-    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
-    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
-    if (mappers > 0) {
-      conf.setInt(CONF_NUM_SPLITS, mappers);
-      conf.setInt(MR_NUM_MAPS, mappers);
-    }
-    conf.setInt(CONF_FILES_MODE, filesMode);
-    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
-    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
-    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
-    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
-    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
-    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
-
-    Job job = new Job(conf);
-    job.setJobName("ExportSnapshot-" + snapshotName);
-    job.setJarByClass(ExportSnapshot.class);
-    TableMapReduceUtil.addDependencyJars(job);
-    job.setMapperClass(ExportMapper.class);
-    job.setInputFormatClass(ExportSnapshotInputFormat.class);
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setMapSpeculativeExecution(false);
-    job.setNumReduceTasks(0);
-
-    // Acquire the delegation Tokens
-    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
-    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-      new Path[] { inputRoot }, srcConf);
-    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
-    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-        new Path[] { outputRoot }, destConf);
-
-    // Run the MR Job
-    if (!job.waitForCompletion(true)) {
-      // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
-      // when it will be available on all the supported versions.
-      throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
-    }
-  }
-
-  private void verifySnapshot(final Configuration baseConf,
-      final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
-    // Update the conf with the current root dir, since may be a different cluster
-    Configuration conf = new Configuration(baseConf);
-    FSUtils.setRootDir(conf, rootDir);
-    FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
-    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
-  }
-
-  /**
-   * Set path ownership.
-   */
-  private void setOwner(final FileSystem fs, final Path path, final String user,
-      final String group, final boolean recursive) throws IOException {
-    if (user != null || group != null) {
-      if (recursive && fs.isDirectory(path)) {
-        for (FileStatus child : fs.listStatus(path)) {
-          setOwner(fs, child.getPath(), user, group, recursive);
-        }
-      }
-      fs.setOwner(path, user, group);
-    }
-  }
-
-  /**
-   * Set path permission.
-   */
-  private void setPermission(final FileSystem fs, final Path path, final short filesMode,
-      final boolean recursive) throws IOException {
-    if (filesMode > 0) {
-      FsPermission perm = new FsPermission(filesMode);
-      if (recursive && fs.isDirectory(path)) {
-        for (FileStatus child : fs.listStatus(path)) {
-          setPermission(fs, child.getPath(), filesMode, recursive);
-        }
-      }
-      fs.setPermission(path, perm);
-    }
-  }
-
-  /**
-   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
-   * @return 0 on success, and != 0 upon failure.
-   */
-  @Override
-  public int run(String[] args) throws IOException {
-    boolean verifyTarget = true;
-    boolean verifyChecksum = true;
-    String snapshotName = null;
-    String targetName = null;
-    boolean overwrite = false;
-    String filesGroup = null;
-    String filesUser = null;
-    Path outputRoot = null;
-    int bandwidthMB = Integer.MAX_VALUE;
-    int filesMode = 0;
-    int mappers = 0;
-
-    Configuration conf = getConf();
-    Path inputRoot = FSUtils.getRootDir(conf);
-
-    // Process command line args
-    for (int i = 0; i < args.length; i++) {
-      String cmd = args[i];
-      if (cmd.equals("-snapshot")) {
-        snapshotName = args[++i];
-      } else if (cmd.equals("-target")) {
-        targetName = args[++i];
-      } else if (cmd.equals("-copy-to")) {
-        outputRoot = new Path(args[++i]);
-      } else if (cmd.equals("-copy-from")) {
-        inputRoot = new Path(args[++i]);
-        FSUtils.setRootDir(conf, inputRoot);
-      } else if (cmd.equals("-no-checksum-verify")) {
-        verifyChecksum = false;
-      } else if (cmd.equals("-no-target-verify")) {
-        verifyTarget = false;
-      } else if (cmd.equals("-mappers")) {
-        mappers = Integer.parseInt(args[++i]);
-      } else if (cmd.equals("-chuser")) {
-        filesUser = args[++i];
-      } else if (cmd.equals("-chgroup")) {
-        filesGroup = args[++i];
-      } else if (cmd.equals("-bandwidth")) {
-        bandwidthMB = Integer.parseInt(args[++i]);
-      } else if (cmd.equals("-chmod")) {
-        filesMode = Integer.parseInt(args[++i], 8);
-      } else if (cmd.equals("-overwrite")) {
-        overwrite = true;
-      } else if (cmd.equals("-h") || cmd.equals("--help")) {
-        printUsageAndExit();
-      } else {
-        System.err.println("UNEXPECTED: " + cmd);
-        printUsageAndExit();
-      }
-    }
-
-    // Check user options
-    if (snapshotName == null) {
-      System.err.println("Snapshot name not provided.");
-      printUsageAndExit();
-    }
-
-    if (outputRoot == null) {
-      System.err.println("Destination file-system not provided.");
-      printUsageAndExit();
-    }
-
-    if (targetName == null) {
-      targetName = snapshotName;
-    }
-
-    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
-    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
-    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
-    LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
-    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
-    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
-    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
-    LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
-
-    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
-
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
-    Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
-    Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
-    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
-
-    // Check if the snapshot already exists
-    if (outputFs.exists(outputSnapshotDir)) {
-      if (overwrite) {
-        if (!outputFs.delete(outputSnapshotDir, true)) {
-          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
-          return 1;
-        }
-      } else {
-        System.err.println("The snapshot '" + targetName +
-          "' already exists in the destination: " + outputSnapshotDir);
-        return 1;
-      }
-    }
-
-    if (!skipTmp) {
-      // Check if the snapshot already in-progress
-      if (outputFs.exists(snapshotTmpDir)) {
-        if (overwrite) {
-          if (!outputFs.delete(snapshotTmpDir, true)) {
-            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
-            return 1;
-          }
-        } else {
-          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
-          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
-          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
-          return 1;
-        }
-      }
-    }
-
-    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
-    // The snapshot references must be copied before the hfiles otherwise the cleaner
-    // will remove them because they are unreferenced.
-    try {
-      LOG.info("Copy Snapshot Manifest");
-      FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
-      if (filesUser != null || filesGroup != null) {
-        setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
-      }
-      if (filesMode > 0) {
-        setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
-      }
-    } catch (IOException e) {
-      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
-        snapshotDir + " to=" + initialOutputSnapshotDir, e);
-    }
-
-    // Write a new .snapshotinfo if the target name is different from the source name
-    if (!targetName.equals(snapshotName)) {
-      SnapshotDescription snapshotDesc =
-        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
-          .toBuilder()
-          .setName(targetName)
-          .build();
-      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
-    }
-
-    // Step 2 - Start MR Job to copy files
-    // The snapshot references must be copied before the files otherwise the files gets removed
-    // by the HFileArchiver, since they have no references.
-    try {
-      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
-                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
-
-      LOG.info("Finalize the Snapshot Export");
-      if (!skipTmp) {
-        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
-        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
-          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
-            snapshotTmpDir + " to=" + outputSnapshotDir);
-        }
-      }
-
-      // Step 4 - Verify snapshot integrity
-      if (verifyTarget) {
-        LOG.info("Verify snapshot integrity");
-        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
-      }
-
-      LOG.info("Export Completed: " + targetName);
-      return 0;
-    } catch (Exception e) {
-      LOG.error("Snapshot export failed", e);
-      if (!skipTmp) {
-        outputFs.delete(snapshotTmpDir, true);
-      }
-      outputFs.delete(outputSnapshotDir, true);
-      return 1;
-    } finally {
-      IOUtils.closeStream(inputFs);
-      IOUtils.closeStream(outputFs);
-    }
-  }
-
-  // ExportSnapshot
-  private void printUsageAndExit() {
-    System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
-    System.err.println(" where [options] are:");
-    System.err.println("  -h|-help                Show this help and exit.");
-    System.err.println("  -snapshot NAME          Snapshot to restore.");
-    System.err.println("  -copy-to NAME           Remote destination hdfs://");
-    System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
-    System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
-    System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
-        "exported snapshot.");
-    System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
-    System.err.println("  -chuser USERNAME        Change the owner of the files " +
-        "to the specified one.");
-    System.err.println("  -chgroup GROUP          Change the group of the files to " +
-        "the specified one.");
-    System.err.println("  -chmod MODE             Change the permission of the files " +
-        "to the specified one.");
-    System.err.println("  -mappers                Number of mappers to use during the " +
-        "copy (mapreduce.job.maps).");
-    System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
-    System.err.println();
-    System.err.println("Examples:");
-    System.err.println("  hbase snapshot export \\");
-    System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
-    System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
-    System.err.println();
-    System.err.println("  hbase snapshot export \\");
-    System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
-    System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
-    System.exit(1);
-  }
-
-  /**
-   * The guts of the {@link #main} method.
-   * Call this method to avoid the {@link #main(String[])} System.exit.
-   * @param args
-   * @return errCode
-   * @throws Exception
-   */
-  static int innerMain(final Configuration conf, final String [] args) throws Exception {
-    return ToolRunner.run(conf, new ExportSnapshot(), args);
-  }
-
-  public static void main(String[] args) throws Exception {
-    System.exit(innerMain(HBaseConfiguration.create(), args));
-  }
-}