You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/11/30 06:56:24 UTC

[6/8] hbase git commit: HBASE-16904 Snapshot related changes for FS redo work

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/ExportSnapshot.java
new file mode 100644
index 0000000..ab90aa7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/ExportSnapshot.java
@@ -0,0 +1,1102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy.snapshot;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageContext;
+import org.apache.hadoop.hbase.fs.legacy.LegacyLayout;
+import org.apache.hadoop.hbase.fs.legacy.LegacyMasterStorage;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.io.FileLink;
+import org.apache.hadoop.hbase.fs.legacy.io.HFileLink;
+import org.apache.hadoop.hbase.io.WALLink;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshotException;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Export the specified snapshot to a given FileSystem.
+ *
+ * The .snapshot/name folder is copied to the destination cluster
+ * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
+ * When everything is done, the second cluster can restore the snapshot.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ExportSnapshot extends Configured implements Tool {
+  public static final String NAME = "exportsnapshot";
+  /** Configuration prefix for overrides for the source filesystem */
+  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
+  /** Configuration prefix for overrides for the destination filesystem */
+  public static final String CONF_DEST_PREFIX = NAME + ".to.";
+
+  private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
+
+  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
+  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
+  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
+  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
+  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
+  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
+  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
+  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
+  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
+  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
+  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
+  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
+  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
+  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
+
+  static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
+  static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
+
+  // Export Map-Reduce Counters, to keep track of the progress
+  public enum Counter {
+    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
+    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
+  }
+
+  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
+                                                   NullWritable, NullWritable> {
+    final static int REPORT_SIZE = 1 * 1024 * 1024;
+    final static int BUFFER_SIZE = 64 * 1024;
+
+    private boolean testFailures;
+    private Random random;
+
+    private boolean verifyChecksum;
+    private String filesGroup;
+    private String filesUser;
+    private short filesMode;
+    private int bufferSize;
+
+    private FileSystem outputFs;
+    private Path outputArchive;
+    private Path outputRoot;
+
+    private FileSystem inputFs;
+    private Path inputArchive;
+    private Path inputRoot;
+
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
+      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
+
+      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
+
+      filesGroup = conf.get(CONF_FILES_GROUP);
+      filesUser = conf.get(CONF_FILES_USER);
+      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
+      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
+      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
+
+      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
+      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+      testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
+
+      try {
+        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
+        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
+      } catch (IOException e) {
+        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
+      }
+
+      try {
+        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
+        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
+      } catch (IOException e) {
+        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
+      }
+
+      // Use the default block size of the outputFs if bigger
+      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
+      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
+      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
+
+      for (Counter c : Counter.values()) {
+        context.getCounter(c).increment(0);
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context) {
+      IOUtils.closeStream(inputFs);
+      IOUtils.closeStream(outputFs);
+    }
+
+    @Override
+    public void map(BytesWritable key, NullWritable value, Context context)
+        throws InterruptedException, IOException {
+      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
+      Path outputPath = getOutputPath(inputInfo);
+
+      copyFile(context, inputInfo, outputPath);
+    }
+
+    /**
+     * Returns the location where the inputPath will be copied.
+     */
+    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
+      Path path = null;
+      switch (inputInfo.getType()) {
+        case HFILE:
+          Path inputPath = new Path(inputInfo.getHfile());
+          String family = inputPath.getParent().getName();
+          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
+          String region = HFileLink.getReferencedRegionName(inputPath.getName());
+          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
+          path = new Path(FSUtils.getTableDir(new Path("./"), table),
+              new Path(region, new Path(family, hfile)));
+          break;
+        case WAL:
+          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
+          break;
+        default:
+          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
+      }
+      return new Path(outputArchive, path);
+    }
+
+    /*
+     * Used by TestExportSnapshot to simulate a failure
+     */
+    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
+        throws IOException {
+      if (testFailures) {
+        if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
+          if (random == null) {
+            random = new Random();
+          }
+
+          // FLAKY-TEST-WARN: lower is better, we can get some runs without the
+          // retry, but at least we reduce the number of test failures due to
+          // this test exception from the same map task.
+          if (random.nextFloat() < 0.03) {
+            throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
+                                  + " time=" + System.currentTimeMillis());
+          }
+        } else {
+          context.getCounter(Counter.COPY_FAILED).increment(1);
+          throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
+        }
+      }
+    }
+
+    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
+        final Path outputPath) throws IOException {
+      injectTestFailure(context, inputInfo);
+
+      // Get the file information
+      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
+
+      // Verify if the output file exists and is the same that we want to copy
+      if (outputFs.exists(outputPath)) {
+        FileStatus outputStat = outputFs.getFileStatus(outputPath);
+        if (outputStat != null && sameFile(inputStat, outputStat)) {
+          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
+          context.getCounter(Counter.FILES_SKIPPED).increment(1);
+          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
+          return;
+        }
+      }
+
+      InputStream in = openSourceFile(context, inputInfo);
+      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
+      if (Integer.MAX_VALUE != bandwidthMB) {
+        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
+      }
+
+      try {
+        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
+
+        // Ensure that the output folder is there and copy the file
+        createOutputPath(outputPath.getParent());
+        FSDataOutputStream out = outputFs.create(outputPath, true);
+        try {
+          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
+        } finally {
+          out.close();
+        }
+
+        // Try to Preserve attributes
+        if (!preserveAttributes(outputPath, inputStat)) {
+          LOG.warn("You may have to run manually chown on: " + outputPath);
+        }
+      } finally {
+        in.close();
+      }
+    }
+
+    /**
+     * Create the output folder and optionally set ownership.
+     */
+    private void createOutputPath(final Path path) throws IOException {
+      if (filesUser == null && filesGroup == null) {
+        outputFs.mkdirs(path);
+      } else {
+        Path parent = path.getParent();
+        if (!outputFs.exists(parent) && !parent.isRoot()) {
+          createOutputPath(parent);
+        }
+        outputFs.mkdirs(path);
+        if (filesUser != null || filesGroup != null) {
+          // override the owner when non-null user/group is specified
+          outputFs.setOwner(path, filesUser, filesGroup);
+        }
+        if (filesMode > 0) {
+          outputFs.setPermission(path, new FsPermission(filesMode));
+        }
+      }
+    }
+
+    /**
+     * Try to Preserve the files attribute selected by the user copying them from the source file
+     * This is only required when you are exporting as a different user than "hbase" or on a system
+     * that doesn't have the "hbase" user.
+     *
+     * This is not considered a blocking failure since the user can force a chmod with the user
+     * that knows is available on the system.
+     */
+    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
+      FileStatus stat;
+      try {
+        stat = outputFs.getFileStatus(path);
+      } catch (IOException e) {
+        LOG.warn("Unable to get the status for file=" + path);
+        return false;
+      }
+
+      try {
+        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
+          outputFs.setPermission(path, new FsPermission(filesMode));
+        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
+          outputFs.setPermission(path, refStat.getPermission());
+        }
+      } catch (IOException e) {
+        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
+        return false;
+      }
+
+      boolean hasRefStat = (refStat != null);
+      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
+      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
+      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
+        try {
+          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
+            outputFs.setOwner(path, user, group);
+          }
+        } catch (IOException e) {
+          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
+          LOG.warn("The user/group may not exist on the destination cluster: user=" +
+                   user + " group=" + group);
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    private boolean stringIsNotEmpty(final String str) {
+      return str != null && str.length() > 0;
+    }
+
+    private void copyData(final Context context,
+        final Path inputPath, final InputStream in,
+        final Path outputPath, final FSDataOutputStream out,
+        final long inputFileSize)
+        throws IOException {
+      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
+                                   " (%.1f%%)";
+
+      try {
+        byte[] buffer = new byte[bufferSize];
+        long totalBytesWritten = 0;
+        int reportBytes = 0;
+        int bytesRead;
+
+        long stime = System.currentTimeMillis();
+        while ((bytesRead = in.read(buffer)) > 0) {
+          out.write(buffer, 0, bytesRead);
+          totalBytesWritten += bytesRead;
+          reportBytes += bytesRead;
+
+          if (reportBytes >= REPORT_SIZE) {
+            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
+            context.setStatus(String.format(statusMessage,
+                              StringUtils.humanReadableInt(totalBytesWritten),
+                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
+                              " from " + inputPath + " to " + outputPath);
+            reportBytes = 0;
+          }
+        }
+        long etime = System.currentTimeMillis();
+
+        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
+        context.setStatus(String.format(statusMessage,
+                          StringUtils.humanReadableInt(totalBytesWritten),
+                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
+                          " from " + inputPath + " to " + outputPath);
+
+        // Verify that the written size match
+        if (totalBytesWritten != inputFileSize) {
+          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
+                       " expected=" + inputFileSize + " for file=" + inputPath;
+          throw new IOException(msg);
+        }
+
+        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
+        LOG.info("size=" + totalBytesWritten +
+            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
+            " time=" + StringUtils.formatTimeDiff(etime, stime) +
+            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
+        context.getCounter(Counter.FILES_COPIED).increment(1);
+      } catch (IOException e) {
+        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
+        context.getCounter(Counter.COPY_FAILED).increment(1);
+        throw e;
+      }
+    }
+
+    /**
+     * Try to open the "source" file.
+     * Throws an IOException if the communication with the inputFs fail or
+     * if the file is not found.
+     */
+    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
+            throws IOException {
+      try {
+        Configuration conf = context.getConfiguration();
+        FileLink link = null;
+        switch (fileInfo.getType()) {
+          case HFILE:
+            Path inputPath = new Path(fileInfo.getHfile());
+            link = getFileLink(inputPath, conf);
+            break;
+          case WAL:
+            String serverName = fileInfo.getWalServer();
+            String logName = fileInfo.getWalName();
+            link = new WALLink(inputRoot, serverName, logName);
+            break;
+          default:
+            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
+        }
+        return link.open(inputFs);
+      } catch (IOException e) {
+        context.getCounter(Counter.MISSING_FILES).increment(1);
+        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
+        throw e;
+      }
+    }
+
+    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
+        throws IOException {
+      try {
+        Configuration conf = context.getConfiguration();
+        FileLink link = null;
+        switch (fileInfo.getType()) {
+          case HFILE:
+            Path inputPath = new Path(fileInfo.getHfile());
+            link = getFileLink(inputPath, conf);
+            break;
+          case WAL:
+            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
+            break;
+          default:
+            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
+        }
+        return link.getFileStatus(inputFs);
+      } catch (FileNotFoundException e) {
+        context.getCounter(Counter.MISSING_FILES).increment(1);
+        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
+        throw e;
+      } catch (IOException e) {
+        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
+        throw e;
+      }
+    }
+
+    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
+      String regionName = HFileLink.getReferencedRegionName(path.getName());
+      TableName tableName = HFileLink.getReferencedTableName(path.getName());
+      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
+        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
+                HFileArchiveUtil.getArchivePath(conf), path);
+      }
+      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
+    }
+
+    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
+      try {
+        return fs.getFileChecksum(path);
+      } catch (IOException e) {
+        LOG.warn("Unable to get checksum for file=" + path, e);
+        return null;
+      }
+    }
+
+    /**
+     * Check if the two files are equal by looking at the file length,
+     * and at the checksum (if user has specified the verifyChecksum flag).
+     */
+    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
+      // Not matching length
+      if (inputStat.getLen() != outputStat.getLen()) return false;
+
+      // Mark files as equals, since user asked for no checksum verification
+      if (!verifyChecksum) return true;
+
+      // If checksums are not available, files are not the same.
+      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
+      if (inChecksum == null) return false;
+
+      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
+      if (outChecksum == null) return false;
+
+      return inChecksum.equals(outChecksum);
+    }
+  }
+
+  // ==========================================================================
+  //  Input Format
+  // ==========================================================================
+
+  /**
+   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
+   * @return list of files referenced by the snapshot (pair of path and size)
+   */
+  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
+      final FileSystem fs, final String snapshotName, StorageContext ctx) throws IOException {
+    LegacyMasterStorage lms = new LegacyMasterStorage(conf, fs,
+        new LegacyPathIdentifier(FSUtils.getRootDir(conf)));
+    SnapshotDescription snapshotDesc = lms.getSnapshot(snapshotName, ctx);
+
+    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
+    final TableName table = TableName.valueOf(snapshotDesc.getTable());
+
+    // Get snapshot files
+    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
+    lms.visitSnapshotStoreFiles(snapshotDesc, ctx, new MasterStorage.SnapshotStoreFileVisitor() {
+      @Override
+      public void visitSnapshotStoreFile(SnapshotDescription snapshot, StorageContext ctx,
+          HRegionInfo hri, String familyName, SnapshotRegionManifest.StoreFile storeFile)
+          throws IOException {
+        // for storeFile.hasReference() case, copied as part of the manifest
+        if (!storeFile.hasReference()) {
+          String region = hri.getEncodedName();
+          String hfile = storeFile.getName();
+          Path path = HFileLink.createPath(table, region, familyName, hfile);
+
+          SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
+              .setType(SnapshotFileInfo.Type.HFILE)
+              .setHfile(path.toString())
+              .build();
+
+          long size;
+          if (storeFile.hasFileSize()) {
+            size = storeFile.getFileSize();
+          } else {
+            size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
+          }
+          files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
+        }
+      }
+    });
+    return files;
+  }
+
+  /**
+   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
+   * The groups created will have similar amounts of bytes.
+   * <p>
+   * The algorithm used is pretty straightforward; the file list is sorted by size,
+   * and then each group fetch the bigger file available, iterating through groups
+   * alternating the direction.
+   */
+  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
+      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
+    // Sort files by size, from small to big
+    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
+      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
+        long r = a.getSecond() - b.getSecond();
+        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
+      }
+    });
+
+    // create balanced groups
+    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
+      new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
+    long[] sizeGroups = new long[ngroups];
+    int hi = files.size() - 1;
+    int lo = 0;
+
+    List<Pair<SnapshotFileInfo, Long>> group;
+    int dir = 1;
+    int g = 0;
+
+    while (hi >= lo) {
+      if (g == fileGroups.size()) {
+        group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
+        fileGroups.add(group);
+      } else {
+        group = fileGroups.get(g);
+      }
+
+      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
+
+      // add the hi one
+      sizeGroups[g] += fileInfo.getSecond();
+      group.add(fileInfo);
+
+      // change direction when at the end or the beginning
+      g += dir;
+      if (g == ngroups) {
+        dir = -1;
+        g = ngroups - 1;
+      } else if (g < 0) {
+        dir = 1;
+        g = 0;
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      for (int i = 0; i < sizeGroups.length; ++i) {
+        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
+      }
+    }
+
+    return fileGroups;
+  }
+
+  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
+    @Override
+    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
+        TaskAttemptContext tac) throws IOException, InterruptedException {
+      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
+      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
+      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
+
+      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotName,
+          StorageContext.DATA);
+      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
+      if (mappers == 0 && snapshotFiles.size() > 0) {
+        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
+        mappers = Math.min(mappers, snapshotFiles.size());
+        conf.setInt(CONF_NUM_SPLITS, mappers);
+        conf.setInt(MR_NUM_MAPS, mappers);
+      }
+
+      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
+      List<InputSplit> splits = new ArrayList(groups.size());
+      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
+        splits.add(new ExportSnapshotInputSplit(files));
+      }
+      return splits;
+    }
+
+    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
+      private List<Pair<BytesWritable, Long>> files;
+      private long length;
+
+      public ExportSnapshotInputSplit() {
+        this.files = null;
+      }
+
+      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
+        this.files = new ArrayList(snapshotFiles.size());
+        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
+          this.files.add(new Pair<BytesWritable, Long>(
+            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
+          this.length += fileInfo.getSecond();
+        }
+      }
+
+      private List<Pair<BytesWritable, Long>> getSplitKeys() {
+        return files;
+      }
+
+      @Override
+      public long getLength() throws IOException, InterruptedException {
+        return length;
+      }
+
+      @Override
+      public String[] getLocations() throws IOException, InterruptedException {
+        return new String[] {};
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        int count = in.readInt();
+        files = new ArrayList<Pair<BytesWritable, Long>>(count);
+        length = 0;
+        for (int i = 0; i < count; ++i) {
+          BytesWritable fileInfo = new BytesWritable();
+          fileInfo.readFields(in);
+          long size = in.readLong();
+          files.add(new Pair<BytesWritable, Long>(fileInfo, size));
+          length += size;
+        }
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        out.writeInt(files.size());
+        for (final Pair<BytesWritable, Long> fileInfo: files) {
+          fileInfo.getFirst().write(out);
+          out.writeLong(fileInfo.getSecond());
+        }
+      }
+    }
+
+    private static class ExportSnapshotRecordReader
+        extends RecordReader<BytesWritable, NullWritable> {
+      private final List<Pair<BytesWritable, Long>> files;
+      private long totalSize = 0;
+      private long procSize = 0;
+      private int index = -1;
+
+      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
+        this.files = files;
+        for (Pair<BytesWritable, Long> fileInfo: files) {
+          totalSize += fileInfo.getSecond();
+        }
+      }
+
+      @Override
+      public void close() { }
+
+      @Override
+      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
+
+      @Override
+      public NullWritable getCurrentValue() { return NullWritable.get(); }
+
+      @Override
+      public float getProgress() { return (float)procSize / totalSize; }
+
+      @Override
+      public void initialize(InputSplit split, TaskAttemptContext tac) { }
+
+      @Override
+      public boolean nextKeyValue() {
+        if (index >= 0) {
+          procSize += files.get(index).getSecond();
+        }
+        return(++index < files.size());
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  Tool
+  // ==========================================================================
+
+  /**
+   * Run Map-Reduce Job to perform the files copy.
+   */
+  private void runCopyJob(final Path inputRoot, final Path outputRoot,
+      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
+      final String filesUser, final String filesGroup, final int filesMode,
+      final int mappers, final int bandwidthMB)
+          throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = getConf();
+    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
+    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
+    if (mappers > 0) {
+      conf.setInt(CONF_NUM_SPLITS, mappers);
+      conf.setInt(MR_NUM_MAPS, mappers);
+    }
+    conf.setInt(CONF_FILES_MODE, filesMode);
+    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
+    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
+    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
+    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
+    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
+    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
+
+    Job job = new Job(conf);
+    job.setJobName("ExportSnapshot-" + snapshotName);
+    job.setJarByClass(ExportSnapshot.class);
+    TableMapReduceUtil.addDependencyJars(job);
+    job.setMapperClass(ExportMapper.class);
+    job.setInputFormatClass(ExportSnapshotInputFormat.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setMapSpeculativeExecution(false);
+    job.setNumReduceTasks(0);
+
+    // Acquire the delegation Tokens
+    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+      new Path[] { inputRoot }, srcConf);
+    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { outputRoot }, destConf);
+
+    // Run the MR Job
+    if (!job.waitForCompletion(true)) {
+      // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
+      // when it will be available on all the supported versions.
+      throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
+    }
+  }
+
+  private void verifySnapshot(final Configuration baseConf,
+      final FileSystem fs, final Path rootDir, final String snapshotName, StorageContext ctx)
+      throws IOException {
+    // Update the conf with the current root dir, since may be a different cluster
+    Configuration conf = new Configuration(baseConf);
+    FSUtils.setRootDir(conf, rootDir);
+    FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
+    LegacyMasterStorage lms =
+        new LegacyMasterStorage(baseConf, fs, new LegacyPathIdentifier(rootDir));
+    SnapshotDescription snapshotDesc = lms.getSnapshot(snapshotName, ctx);
+    SnapshotReferenceUtil.verifySnapshot(lms, snapshotDesc, ctx);
+  }
+
+  /**
+   * Set path ownership.
+   */
+  private void setOwner(final FileSystem fs, final Path path, final String user,
+      final String group, final boolean recursive) throws IOException {
+    if (user != null || group != null) {
+      if (recursive && fs.isDirectory(path)) {
+        for (FileStatus child : fs.listStatus(path)) {
+          setOwner(fs, child.getPath(), user, group, recursive);
+        }
+      }
+      fs.setOwner(path, user, group);
+    }
+  }
+
+  /**
+   * Set path permission.
+   */
+  private void setPermission(final FileSystem fs, final Path path, final short filesMode,
+      final boolean recursive) throws IOException {
+    if (filesMode > 0) {
+      FsPermission perm = new FsPermission(filesMode);
+      if (recursive && fs.isDirectory(path)) {
+        for (FileStatus child : fs.listStatus(path)) {
+          setPermission(fs, child.getPath(), filesMode, recursive);
+        }
+      }
+      fs.setPermission(path, perm);
+    }
+  }
+
+  /**
+   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
+   * @return 0 on success, and != 0 upon failure.
+   */
+  @Override
+  public int run(String[] args) throws IOException {
+    boolean verifyTarget = true;
+    boolean verifyChecksum = true;
+    String snapshotName = null;
+    String targetName = null;
+    boolean overwrite = false;
+    String filesGroup = null;
+    String filesUser = null;
+    Path outputRoot = null;
+    int bandwidthMB = Integer.MAX_VALUE;
+    int filesMode = 0;
+    int mappers = 0;
+
+    Configuration conf = getConf();
+    Path inputRoot = FSUtils.getRootDir(conf);
+
+    // Process command line args
+    for (int i = 0; i < args.length; i++) {
+      String cmd = args[i];
+      if (cmd.equals("-snapshot")) {
+        snapshotName = args[++i];
+      } else if (cmd.equals("-target")) {
+        targetName = args[++i];
+      } else if (cmd.equals("-copy-to")) {
+        outputRoot = new Path(args[++i]);
+      } else if (cmd.equals("-copy-from")) {
+        inputRoot = new Path(args[++i]);
+        FSUtils.setRootDir(conf, inputRoot);
+      } else if (cmd.equals("-no-checksum-verify")) {
+        verifyChecksum = false;
+      } else if (cmd.equals("-no-target-verify")) {
+        verifyTarget = false;
+      } else if (cmd.equals("-mappers")) {
+        mappers = Integer.parseInt(args[++i]);
+      } else if (cmd.equals("-chuser")) {
+        filesUser = args[++i];
+      } else if (cmd.equals("-chgroup")) {
+        filesGroup = args[++i];
+      } else if (cmd.equals("-bandwidth")) {
+        bandwidthMB = Integer.parseInt(args[++i]);
+      } else if (cmd.equals("-chmod")) {
+        filesMode = Integer.parseInt(args[++i], 8);
+      } else if (cmd.equals("-overwrite")) {
+        overwrite = true;
+      } else if (cmd.equals("-h") || cmd.equals("--help")) {
+        printUsageAndExit();
+      } else {
+        System.err.println("UNEXPECTED: " + cmd);
+        printUsageAndExit();
+      }
+    }
+
+    // Check user options
+    if (snapshotName == null) {
+      System.err.println("Snapshot name not provided.");
+      printUsageAndExit();
+    }
+
+    if (outputRoot == null) {
+      System.err.println("Destination file-system not provided.");
+      printUsageAndExit();
+    }
+
+    if (targetName == null) {
+      targetName = snapshotName;
+    }
+
+    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
+    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
+    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
+    LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
+    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
+    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
+    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
+    LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
+
+    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
+
+    LegacyMasterStorage srcMasterStorage = new LegacyMasterStorage(srcConf, inputFs,
+        new LegacyPathIdentifier(inputRoot));
+    LegacyMasterStorage destMasterStorage = new LegacyMasterStorage(destConf, outputFs,
+        new LegacyPathIdentifier(outputRoot));
+    StorageContext destCtx = skipTmp ? StorageContext.TEMP : StorageContext.DATA;
+
+    Path snapshotDir = LegacyLayout.getCompletedSnapshotDir(inputRoot, snapshotName);
+    Path snapshotTmpDir = LegacyLayout.getWorkingSnapshotDir(outputRoot, targetName);
+    Path outputSnapshotDir = LegacyLayout.getCompletedSnapshotDir(outputRoot, targetName);
+    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
+
+    // Check if the snapshot already exists
+    if (destMasterStorage.snapshotExists(targetName)) {
+      if (overwrite) {
+        if (!destMasterStorage.deleteSnapshot(targetName)) {
+          System.err.println("Unable to remove existing snapshot '" + targetName + "'.");
+          return 1;
+        }
+      } else {
+        System.err.println("The snapshot '" + targetName + "' already exists in the destination: " +
+            LegacyLayout.getCompletedSnapshotDir(outputRoot, targetName));
+        return 1;
+      }
+    }
+
+    if (!skipTmp) {
+      // Check if the snapshot already in-progress
+      if (destMasterStorage.snapshotExists(targetName, destCtx)) {
+        if (overwrite) {
+          if (!outputFs.delete(snapshotTmpDir, true)) {
+            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
+            return 1;
+          }
+        } else {
+          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
+          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
+          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
+          return 1;
+        }
+      }
+    }
+
+    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
+    // The snapshot references must be copied before the hfiles otherwise the cleaner
+    // will remove them because they are unreferenced.
+    try {
+      LOG.info("Copy Snapshot Manifest");
+      FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
+      if (filesUser != null || filesGroup != null) {
+        setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
+      }
+      if (filesMode > 0) {
+        setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
+      }
+    } catch (IOException e) {
+      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
+        snapshotDir + " to=" + initialOutputSnapshotDir, e);
+    }
+
+    // Write a new .snapshotinfo if the target name is different from the source name
+    if (!targetName.equals(snapshotName)) {
+      SnapshotDescription snapshotDesc = srcMasterStorage.getSnapshot(snapshotName)
+          .toBuilder()
+          .setName(targetName)
+          .build();
+      destMasterStorage.writeSnapshotInfo(snapshotDesc, snapshotTmpDir);
+    }
+
+    // Step 2 - Start MR Job to copy files
+    // The snapshot references must be copied before the files otherwise the files gets removed
+    // by the HFileArchiver, since they have no references.
+    try {
+      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
+                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
+
+      LOG.info("Finalize the Snapshot Export");
+      if (!skipTmp) {
+        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
+        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
+          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
+            snapshotTmpDir + " to=" + outputSnapshotDir);
+        }
+      }
+
+      // Step 4 - Verify snapshot integrity
+      if (verifyTarget) {
+        LOG.info("Verify snapshot integrity");
+        verifySnapshot(destConf, outputFs, outputRoot, targetName, StorageContext.DATA);
+      }
+
+      LOG.info("Export Completed: " + targetName);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Snapshot export failed", e);
+      if (!skipTmp) {
+        outputFs.delete(snapshotTmpDir, true);
+      }
+      outputFs.delete(outputSnapshotDir, true);
+      return 1;
+    } finally {
+      IOUtils.closeStream(inputFs);
+      IOUtils.closeStream(outputFs);
+    }
+  }
+
+  // ExportSnapshot
+  private void printUsageAndExit() {
+    System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
+    System.err.println(" where [options] are:");
+    System.err.println("  -h|-help                Show this help and exit.");
+    System.err.println("  -snapshot NAME          Snapshot to restore.");
+    System.err.println("  -copy-to NAME           Remote destination hdfs://");
+    System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
+    System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
+    System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
+        "exported snapshot.");
+    System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
+    System.err.println("  -chuser USERNAME        Change the owner of the files " +
+        "to the specified one.");
+    System.err.println("  -chgroup GROUP          Change the group of the files to " +
+        "the specified one.");
+    System.err.println("  -chmod MODE             Change the permission of the files " +
+        "to the specified one.");
+    System.err.println("  -mappers                Number of mappers to use during the " +
+        "copy (mapreduce.job.maps).");
+    System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println("  hbase snapshot export \\");
+    System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
+    System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
+    System.err.println();
+    System.err.println("  hbase snapshot export \\");
+    System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
+    System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
+    System.exit(1);
+  }
+
+  /**
+   * The guts of the {@link #main} method.
+   * Call this method to avoid the {@link #main(String[])} System.exit.
+   * @param args
+   * @return errCode
+   * @throws Exception
+   */
+  static int innerMain(final Configuration conf, final String [] args) throws Exception {
+    return ToolRunner.run(conf, new ExportSnapshot(), args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.exit(innerMain(HBaseConfiguration.create(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/RestoreSnapshotHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/RestoreSnapshotHelper.java
new file mode 100644
index 0000000..8029464
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/RestoreSnapshotHelper.java
@@ -0,0 +1,689 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy.snapshot;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.io.HFileLink;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.snapshot.SnapshotRestoreMetaChanges;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Helper to Restore/Clone a Snapshot
+ *
+ * <p>The helper assumes that a table is already created, and by calling restore()
+ * the content present in the snapshot will be restored as the new content of the table.
+ *
+ * <p>Clone from Snapshot: If the target table is empty, the restore operation
+ * is just a "clone operation", where the only operations are:
+ * <ul>
+ *  <li>for each region in the snapshot create a new region
+ *    (note that the region will have a different name, since the encoding contains the table name)
+ *  <li>for each file in the region create a new HFileLink to point to the original file.
+ *  <li>restore the logs, if any
+ * </ul>
+ *
+ * <p>Restore from Snapshot:
+ * <ul>
+ *  <li>for each region in the table verify which are available in the snapshot and which are not
+ *    <ul>
+ *    <li>if the region is not present in the snapshot, remove it.
+ *    <li>if the region is present in the snapshot
+ *      <ul>
+ *      <li>for each file in the table region verify which are available in the snapshot
+ *        <ul>
+ *          <li>if the hfile is not present in the snapshot, remove it
+ *          <li>if the hfile is present, keep it (nothing to do)
+ *        </ul>
+ *      <li>for each file in the snapshot region but not in the table
+ *        <ul>
+ *          <li>create a new HFileLink that point to the original file
+ *        </ul>
+ *      </ul>
+ *    </ul>
+ *  <li>for each region in the snapshot not present in the current table state
+ *    <ul>
+ *    <li>create a new region and for each file in the region create a new HFileLink
+ *      (This is the same as the clone operation)
+ *    </ul>
+ *  <li>restore the logs, if any
+ * </ul>
+ *
+ * TODO update for MasterStorage / RegionStorage
+ */
+@InterfaceAudience.Private
+public class RestoreSnapshotHelper {
+  private static final Log LOG = LogFactory.getLog(RestoreSnapshotHelper.class);
+
+  private final Map<byte[], byte[]> regionsMap =
+        new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+
+  private final Map<String, Pair<String, String> > parentsMap =
+      new HashMap<String, Pair<String, String> >();
+
+  private final ForeignExceptionDispatcher monitor;
+  private final MonitoredTask status;
+
+  private final SnapshotManifest snapshotManifest;
+  private final SnapshotDescription snapshotDesc;
+  private final TableName snapshotTable;
+
+  private final HTableDescriptor tableDesc;
+  private final Path tableDir;
+
+
+  private final Configuration conf;
+  private final FileSystem fs;
+  private final MasterStorage<? extends StorageIdentifier> masterStorage;
+  private final boolean createBackRefs;
+
+  public RestoreSnapshotHelper(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      final SnapshotDescription snapshotDesc, final HTableDescriptor tableDescriptor,
+      final ForeignExceptionDispatcher monitor, final MonitoredTask status) throws IOException {
+    this(masterStorage, snapshotDesc, tableDescriptor, monitor, status, true);
+  }
+
+  public RestoreSnapshotHelper(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      final SnapshotDescription snapshotDesc, final HTableDescriptor tableDescriptor,
+      final ForeignExceptionDispatcher monitor, final MonitoredTask status,
+      final boolean createBackRefs) throws IOException {
+    this.masterStorage = masterStorage;
+    this.conf = masterStorage.getConfiguration();
+    this.fs = masterStorage.getFileSystem();
+    this.snapshotDesc = snapshotDesc;
+    this.snapshotManifest = SnapshotManifest.open(conf, snapshotDesc);
+    this.snapshotTable = TableName.valueOf(snapshotDesc.getTable());
+    this.tableDesc = tableDescriptor;
+    this.tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDesc.getTableName());
+    this.monitor = monitor;
+    this.status = status;
+    this.createBackRefs = createBackRefs;
+  }
+
+  /**
+   * Restore the on-disk table to a specified snapshot state.
+   * @return the set of regions touched by the restore operation
+   */
+  public SnapshotRestoreMetaChanges restoreStorageRegions() throws IOException {
+    ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot");
+    try {
+      return restoreHdfsRegions(exec);
+    } finally {
+      exec.shutdown();
+    }
+  }
+
+  private SnapshotRestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException {
+    LOG.info("starting restore table regions using snapshot=" + snapshotDesc);
+
+    Map<String, SnapshotRegionManifest> regionManifests = snapshotManifest.getRegionManifestsMap();
+    if (regionManifests == null) {
+      LOG.warn("Nothing to restore. Snapshot " + snapshotDesc + " looks empty");
+      return null;
+    }
+
+    SnapshotRestoreMetaChanges metaChanges = new SnapshotRestoreMetaChanges(tableDesc, parentsMap);
+
+    // Take a copy of the manifest.keySet() since we are going to modify
+    // this instance, by removing the regions already present in the restore dir.
+    Set<String> regionNames = new HashSet<String>(regionManifests.keySet());
+
+    HRegionInfo mobRegion = MobUtils.getMobRegionInfo(snapshotManifest.getTableDescriptor()
+        .getTableName());
+    // Identify which region are still available and which not.
+    // NOTE: we rely upon the region name as: "table name, start key, end key"
+    List<HRegionInfo> tableRegions = getTableRegions();
+    if (tableRegions != null) {
+      monitor.rethrowException();
+      for (HRegionInfo regionInfo: tableRegions) {
+        String regionName = regionInfo.getEncodedName();
+        if (regionNames.contains(regionName)) {
+          LOG.info("region to restore: " + regionName);
+          regionNames.remove(regionName);
+          metaChanges.addRegionToRestore(regionInfo);
+        } else {
+          LOG.info("region to remove: " + regionName);
+          metaChanges.addRegionToRemove(regionInfo);
+        }
+      }
+
+      // Restore regions using the snapshot data
+      monitor.rethrowException();
+      status.setStatus("Restoring table regions...");
+      if (regionNames.contains(mobRegion.getEncodedName())) {
+        // restore the mob region in case
+        List<HRegionInfo> mobRegions = new ArrayList<HRegionInfo>(1);
+        mobRegions.add(mobRegion);
+        restoreHdfsMobRegions(exec, regionManifests, mobRegions);
+        regionNames.remove(mobRegion.getEncodedName());
+      }
+      restoreHdfsRegions(exec, regionManifests, metaChanges.getRegionsToRestore());
+      status.setStatus("Finished restoring all table regions.");
+
+      // Remove regions from the current table
+      monitor.rethrowException();
+      status.setStatus("Starting to delete excess regions from table");
+      removeHdfsRegions(exec, metaChanges.getRegionsToRemove());
+      status.setStatus("Finished deleting excess regions from table.");
+    }
+
+    // Regions to Add: present in the snapshot but not in the current table
+    if (regionNames.size() > 0) {
+      List<HRegionInfo> regionsToAdd = new ArrayList<HRegionInfo>(regionNames.size());
+
+      monitor.rethrowException();
+      // add the mob region
+      if (regionNames.contains(mobRegion.getEncodedName())) {
+        cloneHdfsMobRegion(regionManifests, mobRegion);
+        regionNames.remove(mobRegion.getEncodedName());
+      }
+      for (String regionName: regionNames) {
+        LOG.info("region to add: " + regionName);
+        regionsToAdd.add(HRegionInfo.convert(regionManifests.get(regionName).getRegionInfo()));
+      }
+
+      // Create new regions cloning from the snapshot
+      monitor.rethrowException();
+      status.setStatus("Cloning regions...");
+      HRegionInfo[] clonedRegions = cloneHdfsRegions(exec, regionManifests, regionsToAdd);
+      metaChanges.setNewRegions(clonedRegions);
+      status.setStatus("Finished cloning regions.");
+    }
+
+    LOG.info("finishing restore table regions using snapshot=" + snapshotDesc);
+
+    return metaChanges;
+  }
+
+  /**
+   * Remove specified regions from the file-system, using the archiver.
+   */
+  private void removeHdfsRegions(final ThreadPoolExecutor exec, final List<HRegionInfo> regions)
+      throws IOException {
+    if (regions == null || regions.size() == 0) return;
+    ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+      @Override
+      public void editRegion(final HRegionInfo hri) throws IOException {
+        HFileArchiver.archiveRegion(conf, fs, hri);
+      }
+    });
+  }
+
+  /**
+   * Restore specified regions by restoring content to the snapshot state.
+   */
+  private void restoreHdfsRegions(final ThreadPoolExecutor exec,
+      final Map<String, SnapshotRegionManifest> regionManifests,
+      final List<HRegionInfo> regions) throws IOException {
+    if (regions == null || regions.size() == 0) return;
+    ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+      @Override
+      public void editRegion(final HRegionInfo hri) throws IOException {
+        restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
+      }
+    });
+  }
+
+  /**
+   * Restore specified mob regions by restoring content to the snapshot state.
+   */
+  private void restoreHdfsMobRegions(final ThreadPoolExecutor exec,
+      final Map<String, SnapshotRegionManifest> regionManifests,
+      final List<HRegionInfo> regions) throws IOException {
+    if (regions == null || regions.size() == 0) return;
+    ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+      @Override
+      public void editRegion(final HRegionInfo hri) throws IOException {
+        restoreMobRegion(hri, regionManifests.get(hri.getEncodedName()));
+      }
+    });
+  }
+
+  private Map<String, List<SnapshotRegionManifest.StoreFile>> getRegionHFileReferences(
+      final SnapshotRegionManifest manifest) {
+    Map<String, List<SnapshotRegionManifest.StoreFile>> familyMap =
+      new HashMap<String, List<SnapshotRegionManifest.StoreFile>>(manifest.getFamilyFilesCount());
+    for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
+      familyMap.put(familyFiles.getFamilyName().toStringUtf8(),
+        new ArrayList<SnapshotRegionManifest.StoreFile>(familyFiles.getStoreFilesList()));
+    }
+    return familyMap;
+  }
+
+  /**
+   * Restore region by removing files not in the snapshot
+   * and adding the missing ones from the snapshot.
+   */
+  private void restoreRegion(final HRegionInfo regionInfo,
+      final SnapshotRegionManifest regionManifest) throws IOException {
+    restoreRegion(regionInfo, regionManifest, new Path(tableDir, regionInfo.getEncodedName()));
+  }
+
+  /**
+   * Restore mob region by removing files not in the snapshot
+   * and adding the missing ones from the snapshot.
+   */
+  private void restoreMobRegion(final HRegionInfo regionInfo,
+      final SnapshotRegionManifest regionManifest) throws IOException {
+    if (regionManifest == null) {
+      return;
+    }
+    restoreRegion(regionInfo, regionManifest,
+      MobUtils.getMobRegionPath(conf, tableDesc.getTableName()));
+  }
+
+  /**
+   * Restore region by removing files not in the snapshot
+   * and adding the missing ones from the snapshot.
+   */
+  private void restoreRegion(final HRegionInfo regionInfo,
+      final SnapshotRegionManifest regionManifest, Path regionDir) throws IOException {
+    Map<String, List<SnapshotRegionManifest.StoreFile>> snapshotFiles =
+                getRegionHFileReferences(regionManifest);
+
+    String tableName = tableDesc.getTableName().getNameAsString();
+
+    // Restore families present in the table
+    for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+      byte[] family = Bytes.toBytes(familyDir.getName());
+      Set<String> familyFiles = getTableRegionFamilyFiles(familyDir);
+      List<SnapshotRegionManifest.StoreFile> snapshotFamilyFiles =
+          snapshotFiles.remove(familyDir.getName());
+      if (snapshotFamilyFiles != null) {
+        List<SnapshotRegionManifest.StoreFile> hfilesToAdd =
+            new ArrayList<SnapshotRegionManifest.StoreFile>();
+        for (SnapshotRegionManifest.StoreFile storeFile: snapshotFamilyFiles) {
+          if (familyFiles.contains(storeFile.getName())) {
+            // HFile already present
+            familyFiles.remove(storeFile.getName());
+          } else {
+            // HFile missing
+            hfilesToAdd.add(storeFile);
+          }
+        }
+
+        // Remove hfiles not present in the snapshot
+        for (String hfileName: familyFiles) {
+          Path hfile = new Path(familyDir, hfileName);
+          LOG.trace("Removing hfile=" + hfileName +
+            " from region=" + regionInfo.getEncodedName() + " table=" + tableName);
+          HFileArchiver.archiveStoreFile(conf, fs, regionInfo, tableDir, family, hfile);
+        }
+
+        // Restore Missing files
+        for (SnapshotRegionManifest.StoreFile storeFile: hfilesToAdd) {
+          LOG.debug("Adding HFileLink " + storeFile.getName() +
+            " to region=" + regionInfo.getEncodedName() + " table=" + tableName);
+          restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
+        }
+      } else {
+        // Family doesn't exists in the snapshot
+        LOG.trace("Removing family=" + Bytes.toString(family) +
+          " from region=" + regionInfo.getEncodedName() + " table=" + tableName);
+        HFileArchiver.archiveFamily(fs, conf, regionInfo, tableDir, family);
+        fs.delete(familyDir, true);
+      }
+    }
+
+    // Add families not present in the table
+    for (Map.Entry<String, List<SnapshotRegionManifest.StoreFile>> familyEntry:
+                                                                      snapshotFiles.entrySet()) {
+      Path familyDir = new Path(regionDir, familyEntry.getKey());
+      if (!fs.mkdirs(familyDir)) {
+        throw new IOException("Unable to create familyDir=" + familyDir);
+      }
+
+      for (SnapshotRegionManifest.StoreFile storeFile: familyEntry.getValue()) {
+        LOG.trace("Adding HFileLink " + storeFile.getName() + " to table=" + tableName);
+        restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
+      }
+    }
+  }
+
+  /**
+   * @return The set of files in the specified family directory.
+   */
+  private Set<String> getTableRegionFamilyFiles(final Path familyDir) throws IOException {
+    FileStatus[] hfiles = FSUtils.listStatus(fs, familyDir);
+    if (hfiles == null) return Collections.emptySet();
+
+    Set<String> familyFiles = new HashSet<String>(hfiles.length);
+    for (int i = 0; i < hfiles.length; ++i) {
+      String hfileName = hfiles[i].getPath().getName();
+      familyFiles.add(hfileName);
+    }
+
+    return familyFiles;
+  }
+
+  /**
+   * Clone specified regions. For each region create a new region
+   * and create a HFileLink for each hfile.
+   */
+  private HRegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec,
+      final Map<String, SnapshotRegionManifest> regionManifests,
+      final List<HRegionInfo> regions) throws IOException {
+    if (regions == null || regions.size() == 0) return null;
+
+    final Map<String, HRegionInfo> snapshotRegions =
+      new HashMap<String, HRegionInfo>(regions.size());
+
+    // clone region info (change embedded tableName with the new one)
+    HRegionInfo[] clonedRegionsInfo = new HRegionInfo[regions.size()];
+    for (int i = 0; i < clonedRegionsInfo.length; ++i) {
+      // clone the region info from the snapshot region info
+      HRegionInfo snapshotRegionInfo = regions.get(i);
+      clonedRegionsInfo[i] = cloneRegionInfo(snapshotRegionInfo);
+
+      // add the region name mapping between snapshot and cloned
+      String snapshotRegionName = snapshotRegionInfo.getEncodedName();
+      String clonedRegionName = clonedRegionsInfo[i].getEncodedName();
+      regionsMap.put(Bytes.toBytes(snapshotRegionName), Bytes.toBytes(clonedRegionName));
+      LOG.info("clone region=" + snapshotRegionName + " as " + clonedRegionName);
+
+      // Add mapping between cloned region name and snapshot region info
+      snapshotRegions.put(clonedRegionName, snapshotRegionInfo);
+    }
+
+    // create the regions on disk
+    ModifyRegionUtils.createRegions(exec, conf,
+        tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
+        @Override
+        public void fillRegion(final HRegion region) throws IOException {
+          HRegionInfo snapshotHri = snapshotRegions.get(region.getRegionInfo().getEncodedName());
+          cloneRegion(region, snapshotHri, regionManifests.get(snapshotHri.getEncodedName()));
+        }
+      });
+
+    return clonedRegionsInfo;
+  }
+
+  /**
+   * Clone the mob region. For the region create a new region
+   * and create a HFileLink for each hfile.
+   */
+  private void cloneHdfsMobRegion(final Map<String, SnapshotRegionManifest> regionManifests,
+      final HRegionInfo region) throws IOException {
+    // clone region info (change embedded tableName with the new one)
+    Path clonedRegionPath = MobUtils.getMobRegionPath(conf, tableDesc.getTableName());
+    cloneRegion(clonedRegionPath, region, regionManifests.get(region.getEncodedName()));
+  }
+
+  /**
+   * Clone region directory content from the snapshot info.
+   *
+   * Each region is encoded with the table name, so the cloned region will have
+   * a different region name.
+   *
+   * Instead of copying the hfiles a HFileLink is created.
+   *
+   * @param regionDir {@link Path} cloned dir
+   * @param snapshotRegionInfo
+   */
+  private void cloneRegion(final Path regionDir, final HRegionInfo snapshotRegionInfo,
+      final SnapshotRegionManifest manifest) throws IOException {
+    final String tableName = tableDesc.getTableName().getNameAsString();
+    for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
+      Path familyDir = new Path(regionDir, familyFiles.getFamilyName().toStringUtf8());
+      for (SnapshotRegionManifest.StoreFile storeFile: familyFiles.getStoreFilesList()) {
+        LOG.info("Adding HFileLink " + storeFile.getName() + " to table=" + tableName);
+        restoreStoreFile(familyDir, snapshotRegionInfo, storeFile, createBackRefs);
+      }
+    }
+  }
+
+  /**
+   * Clone region directory content from the snapshot info.
+   *
+   * Each region is encoded with the table name, so the cloned region will have
+   * a different region name.
+   *
+   * Instead of copying the hfiles a HFileLink is created.
+   *
+   * @param region {@link HRegion} cloned
+   * @param snapshotRegionInfo
+   */
+  private void cloneRegion(final HRegion region, final HRegionInfo snapshotRegionInfo,
+      final SnapshotRegionManifest manifest) throws IOException {
+    cloneRegion(new Path(tableDir, region.getRegionInfo().getEncodedName()), snapshotRegionInfo,
+      manifest);
+  }
+
+  /**
+   * Create a new {@link HFileLink} to reference the store file.
+   * <p>The store file in the snapshot can be a simple hfile, an HFileLink or a reference.
+   * <ul>
+   *   <li>hfile: abc -> table=region-abc
+   *   <li>reference: abc.1234 -> table=region-abc.1234
+   *   <li>hfilelink: table=region-hfile -> table=region-hfile
+   * </ul>
+   * @param familyDir destination directory for the store file
+   * @param regionInfo destination region info for the table
+   * @param createBackRef - Whether back reference should be created. Defaults to true.
+   * @param storeFile store file name (can be a Reference, HFileLink or simple HFile)
+   */
+  private void restoreStoreFile(final Path familyDir, final HRegionInfo regionInfo,
+      final SnapshotRegionManifest.StoreFile storeFile, final boolean createBackRef)
+          throws IOException {
+    String hfileName = storeFile.getName();
+    if (HFileLink.isHFileLink(hfileName)) {
+      HFileLink.createFromHFileLink(conf, fs, familyDir, hfileName, createBackRef);
+    } else if (StoreFileInfo.isReference(hfileName)) {
+      restoreReferenceFile(familyDir, regionInfo, storeFile);
+    } else {
+      HFileLink.create(conf, fs, familyDir, regionInfo, hfileName, createBackRef);
+    }
+  }
+
+  /**
+   * Create a new {@link Reference} as copy of the source one.
+   * <p><blockquote><pre>
+   * The source table looks like:
+   *    1234/abc      (original file)
+   *    5678/abc.1234 (reference file)
+   *
+   * After the clone operation looks like:
+   *   wxyz/table=1234-abc
+   *   stuv/table=1234-abc.wxyz
+   *
+   * NOTE that the region name in the clone changes (md5 of regioninfo)
+   * and the reference should reflect that change.
+   * </pre></blockquote>
+   * @param familyDir destination directory for the store file
+   * @param regionInfo destination region info for the table
+   * @param storeFile reference file name
+   */
+  private void restoreReferenceFile(final Path familyDir, final HRegionInfo regionInfo,
+      final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+    String hfileName = storeFile.getName();
+
+    // Extract the referred information (hfile name and parent region)
+    Path refPath =
+        StoreFileInfo.getReferredToFile(new Path(new Path(new Path(new Path(snapshotTable
+            .getNamespaceAsString(), snapshotTable.getQualifierAsString()), regionInfo
+            .getEncodedName()), familyDir.getName()), hfileName));
+    String snapshotRegionName = refPath.getParent().getParent().getName();
+    String fileName = refPath.getName();
+
+    // The new reference should have the cloned region name as parent, if it is a clone.
+    String clonedRegionName = Bytes.toString(regionsMap.get(Bytes.toBytes(snapshotRegionName)));
+    if (clonedRegionName == null) clonedRegionName = snapshotRegionName;
+
+    // The output file should be a reference link table=snapshotRegion-fileName.clonedRegionName
+    Path linkPath = null;
+    String refLink = fileName;
+    if (!HFileLink.isHFileLink(fileName)) {
+      refLink = HFileLink.createHFileLinkName(snapshotTable, snapshotRegionName, fileName);
+      linkPath = new Path(familyDir,
+        HFileLink.createHFileLinkName(snapshotTable, regionInfo.getEncodedName(), hfileName));
+    }
+
+    Path outPath = new Path(familyDir, refLink + '.' + clonedRegionName);
+
+    // Create the new reference
+    if (storeFile.hasReference()) {
+      Reference reference = Reference.convert(storeFile.getReference());
+      reference.write(fs, outPath);
+    } else {
+      InputStream in;
+      if (linkPath != null) {
+        in = HFileLink.buildFromHFileLinkPattern(conf, linkPath).open(fs);
+      } else {
+        linkPath = new Path(new Path(HRegion.getRegionDir(snapshotManifest.getSnapshotDir(),
+                        regionInfo.getEncodedName()), familyDir.getName()), hfileName);
+        in = fs.open(linkPath);
+      }
+      OutputStream out = fs.create(outPath);
+      IOUtils.copyBytes(in, out, conf);
+    }
+
+    // Add the daughter region to the map
+    String regionName = Bytes.toString(regionsMap.get(regionInfo.getEncodedNameAsBytes()));
+    LOG.debug("Restore reference " + regionName + " to " + clonedRegionName);
+    synchronized (parentsMap) {
+      Pair<String, String> daughters = parentsMap.get(clonedRegionName);
+      if (daughters == null) {
+        daughters = new Pair<String, String>(regionName, null);
+        parentsMap.put(clonedRegionName, daughters);
+      } else if (!regionName.equals(daughters.getFirst())) {
+        daughters.setSecond(regionName);
+      }
+    }
+  }
+
+  /**
+   * Create a new {@link HRegionInfo} from the snapshot region info.
+   * Keep the same startKey, endKey, regionId and split information but change
+   * the table name.
+   *
+   * @param snapshotRegionInfo Info for region to clone.
+   * @return the new HRegion instance
+   */
+  public HRegionInfo cloneRegionInfo(final HRegionInfo snapshotRegionInfo) {
+    return cloneRegionInfo(tableDesc.getTableName(), snapshotRegionInfo);
+  }
+
+  public static HRegionInfo cloneRegionInfo(TableName tableName, HRegionInfo snapshotRegionInfo) {
+    HRegionInfo regionInfo = new HRegionInfo(tableName,
+                      snapshotRegionInfo.getStartKey(), snapshotRegionInfo.getEndKey(),
+                      snapshotRegionInfo.isSplit(), snapshotRegionInfo.getRegionId());
+    regionInfo.setOffline(snapshotRegionInfo.isOffline());
+    return regionInfo;
+  }
+
+  /**
+   * @return the set of the regions contained in the table
+   */
+  private List<HRegionInfo> getTableRegions() throws IOException {
+    LOG.debug("get table regions: " + tableDir);
+    FileStatus[] regionDirs = FSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
+    if (regionDirs == null) return null;
+
+    List<HRegionInfo> regions = new LinkedList<HRegionInfo>();
+    for (FileStatus regionDir: regionDirs) {
+      final RegionStorage rs = RegionStorage.open(conf, new LegacyPathIdentifier(regionDir.getPath()), false);
+      regions.add(rs.getRegionInfo());
+    }
+    LOG.debug("found " + regions.size() + " regions for table=" +
+        tableDesc.getTableName().getNameAsString());
+    return regions;
+  }
+
+  /**
+   * Copy the snapshot files for a snapshot scanner, discards meta changes.
+   * @param masterStorage the {@link MasterStorage} to use
+   * @param restoreDir
+   * @param snapshotName
+   * @throws IOException
+   */
+  public static SnapshotRestoreMetaChanges copySnapshotForScanner(
+      final MasterStorage<? extends StorageIdentifier> masterStorage, Path restoreDir,
+      String snapshotName) throws IOException {
+    Configuration conf = masterStorage.getConfiguration();
+    Path rootDir = ((LegacyPathIdentifier)masterStorage.getRootContainer()).path;
+    // ensure that restore dir is not under root dir
+    if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) {
+      throw new IllegalArgumentException("Filesystems for restore directory and HBase root " +
+          "directory should be the same");
+    }
+    if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
+      throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " +
+          "root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir);
+    }
+
+    SnapshotDescription snapshotDesc = masterStorage.getSnapshot(snapshotName);
+    HTableDescriptor htd = masterStorage.getTableDescriptorForSnapshot(snapshotDesc);
+
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Restoring  snapshot '" + snapshotName + "' to directory " + restoreDir);
+    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
+
+    // we send createBackRefs=false so that restored hfiles do not create back reference links
+    // in the base hbase root dir.
+    RestoreSnapshotHelper helper = new RestoreSnapshotHelper(masterStorage, snapshotDesc, htd,
+        monitor, status, false);
+    SnapshotRestoreMetaChanges metaChanges = helper.restoreStorageRegions(); // TODO: parallelize.
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Restored table dir:" + restoreDir);
+      FSUtils.logFileSystemState(masterStorage.getFileSystem(), restoreDir, LOG);
+    }
+    return metaChanges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotFileCache.java
index f9f1c67..2653c8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotFileCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotFileCache.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.fs.legacy.LegacyLayout;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -89,6 +90,7 @@ public class SnapshotFileCache implements Stoppable {
   private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class);
   private volatile boolean stop = false;
   private final FileSystem fs;
+  private final Path rootDir;
   private final SnapshotFileInspector fileInspector;
   private final Path snapshotDir;
   private final Set<String> cache = new HashSet<String>();
@@ -133,8 +135,9 @@ public class SnapshotFileCache implements Stoppable {
   public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
       long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) {
     this.fs = fs;
+    this.rootDir = rootDir;
     this.fileInspector = inspectSnapshotFiles;
-    this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
+    this.snapshotDir = LegacyLayout.getSnapshotDir(rootDir);
     // periodically refresh the file cache to make sure we aren't superfluously saving files.
     this.refreshTimer = new Timer(refreshThreadName, true);
     this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
@@ -224,7 +227,7 @@ public class SnapshotFileCache implements Stoppable {
     // get the status of the snapshots temporary directory and check if it has changes
     // The top-level directory timestamp is not updated, so we have to check the inner-level.
     try {
-      Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
+      Path snapshotTmpDir = LegacyLayout.getWorkingSnapshotDir(rootDir);
       FileStatus tempDirStatus = fs.getFileStatus(snapshotTmpDir);
       lastTimestamp = Math.min(lastTimestamp, tempDirStatus.getModificationTime());
       hasChanges |= (lastTimestamp >= lastModifiedTime);
@@ -273,7 +276,7 @@ public class SnapshotFileCache implements Stoppable {
     for (FileStatus snapshot : snapshots) {
       String name = snapshot.getPath().getName();
       // its not the tmp dir,
-      if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
+      if (!name.equals(LegacyLayout.SNAPSHOT_TMP_DIR_NAME)) {
         SnapshotDirectoryInfo files = this.snapshots.remove(name);
         // 3.1.1 if we don't know about the snapshot or its been modified, we need to update the
         // files the latter could occur where I create a snapshot, then delete it, and then make a
@@ -300,7 +303,7 @@ public class SnapshotFileCache implements Stoppable {
     final SnapshotManager snapshotManager) throws IOException {
     List<String> snapshotInProgress = Lists.newArrayList();
     // only add those files to the cache, but not to the known snapshots
-    Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
+    Path snapshotTmpDir = LegacyLayout.getWorkingSnapshotDir(rootDir);
     // only add those files to the cache, but not to the known snapshots
     FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
     if (running != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotHFileCleaner.java
index 89704f0..24c4274 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotHFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotHFileCleaner.java
@@ -97,7 +97,8 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
           "snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
             public Collection<String> filesUnderSnapshot(final Path snapshotDir)
                 throws IOException {
-              return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
+              return SnapshotReferenceUtil.getHFileNames(master.getMasterStorage(),
+                  snapshotDir.getName());
             }
           });
     } catch (IOException e) {