You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/11/30 06:56:23 UTC

[5/8] hbase git commit: HBASE-16904 Snapshot related changes for FS redo work

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifest.java
new file mode 100644
index 0000000..58f7bf1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifest.java
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy.snapshot;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.fs.legacy.LegacyLayout;
+import org.apache.hadoop.hbase.fs.legacy.LegacyTableDescriptor;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Utility class to help read/write the Snapshot Manifest.
+ *
+ * The snapshot format is transparent for the users of this class,
+ * once the snapshot is written, it will never be modified.
+ * On open() the snapshot will be loaded to the current in-memory format.
+ */
+@InterfaceAudience.Private
+public final class SnapshotManifest {
+  private static final Log LOG = LogFactory.getLog(SnapshotManifest.class);
+
+  public static final String SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY = "snapshot.manifest.size.limit";
+
+  public static final String DATA_MANIFEST_NAME = "data.manifest";
+
+  private List<SnapshotRegionManifest> regionManifests;
+  private SnapshotDescription desc;
+  private HTableDescriptor htd;
+
+  private final ForeignExceptionSnare monitor;
+  private final Configuration conf;
+  private final Path workingDir;
+  private final FileSystem fs;
+  private int manifestSizeLimit;
+
+  private SnapshotManifest(final Configuration conf, final FileSystem fs,
+      final Path workingDir, final SnapshotDescription desc,
+      final ForeignExceptionSnare monitor) {
+    this.monitor = monitor;
+    this.desc = desc;
+    this.workingDir = workingDir;
+    this.conf = conf;
+    this.fs = fs;
+
+    this.manifestSizeLimit = conf.getInt(SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY, 64 * 1024 * 1024);
+  }
+
+  /**
+   * Return a SnapshotManifest instance, used for writing a snapshot.
+   *
+   * There are two usage pattern:
+   *  - The Master will create a manifest, add the descriptor, offline regions
+   *    and consolidate the snapshot by writing all the pending stuff on-disk.
+   *      manifest = SnapshotManifest.create(...)
+   *      manifest.addRegion(tableDir, hri)
+   *      manifest.consolidate()
+   *  - The RegionServer will create a single region manifest
+   *      manifest = SnapshotManifest.create(...)
+   *      manifest.addRegion(region)
+   */
+  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
+      final Path workingDir, final SnapshotDescription desc,
+      final ForeignExceptionSnare monitor) {
+    return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
+  }
+
+  /**
+   * Return a SnapshotManifest instance with the information already loaded in-memory.
+   *    SnapshotManifest manifest = SnapshotManifest.open(...)
+   *    HTableDescriptor htd = manifest.getTableDescriptor()
+   *    for (SnapshotRegionManifest regionManifest: manifest.getRegionManifests())
+   *      hri = regionManifest.getRegionInfo()
+   *      for (regionManifest.getFamilyFiles())
+   *        ...
+   */
+  public static SnapshotManifest open(final Configuration conf, final FileSystem fs,
+      final Path workingDir, final SnapshotDescription desc) throws IOException {
+    SnapshotManifest manifest = new SnapshotManifest(conf, fs, workingDir, desc, null);
+    manifest.load();
+    return manifest;
+  }
+
+  public static SnapshotManifest open(final Configuration conf, final SnapshotDescription desc)
+      throws IOException {
+    Path snapshotDir = LegacyLayout.getCompletedSnapshotDir(FSUtils.getRootDir(conf), desc);
+    return open(conf, snapshotDir.getFileSystem(conf), snapshotDir, desc);
+  }
+
+  /**
+   * Add the table descriptor to the snapshot manifest
+   */
+  public void addTableDescriptor(final HTableDescriptor htd) throws IOException {
+    this.htd = htd;
+  }
+
+  interface RegionVisitor<TRegion, TFamily> {
+    TRegion regionOpen(final HRegionInfo regionInfo) throws IOException;
+    void regionClose(final TRegion region) throws IOException;
+
+    TFamily familyOpen(final TRegion region, final byte[] familyName) throws IOException;
+    void familyClose(final TRegion region, final TFamily family) throws IOException;
+
+    void storeFile(final TRegion region, final TFamily family, final StoreFileInfo storeFile)
+      throws IOException;
+  }
+
+  private RegionVisitor createRegionVisitor(final SnapshotDescription desc) throws IOException {
+    switch (getSnapshotFormat(desc)) {
+      case SnapshotManifestV1.DESCRIPTOR_VERSION:
+        return new SnapshotManifestV1.ManifestBuilder(conf, fs, workingDir);
+      case SnapshotManifestV2.DESCRIPTOR_VERSION:
+        return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir);
+      default:
+      throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
+        ProtobufUtil.createSnapshotDesc(desc));
+    }
+  }
+
+  public void addMobRegion(HRegionInfo regionInfo, HColumnDescriptor[] hcds) throws IOException {
+    // 0. Get the ManifestBuilder/RegionVisitor
+    RegionVisitor visitor = createRegionVisitor(desc);
+
+    // 1. dump region meta info into the snapshot directory
+    LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot.");
+    Object regionData = visitor.regionOpen(regionInfo);
+    monitor.rethrowException();
+
+    // 2. iterate through all the stores in the region
+    LOG.debug("Creating references for mob files");
+
+    Path mobRegionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
+    for (HColumnDescriptor hcd : hcds) {
+      // 2.1. build the snapshot reference for the store if it's a mob store
+      if (!hcd.isMobEnabled()) {
+        continue;
+      }
+      Object familyData = visitor.familyOpen(regionData, hcd.getName());
+      monitor.rethrowException();
+
+      Path storePath = MobUtils.getMobFamilyPath(mobRegionPath, hcd.getNameAsString());
+      List<StoreFileInfo> storeFiles = getStoreFiles(storePath);
+      if (storeFiles == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No mob files under family: " + hcd.getNameAsString());
+        }
+        continue;
+      }
+
+      addReferenceFiles(visitor, regionData, familyData, storeFiles, true);
+
+      visitor.familyClose(regionData, familyData);
+    }
+    visitor.regionClose(regionData);
+  }
+
+  /**
+   * Creates a 'manifest' for the specified region, by reading directly from the HRegion object.
+   * This is used by the "online snapshot" when the table is enabled.
+   */
+  public void addRegion(final HRegion region) throws IOException {
+    // 0. Get the ManifestBuilder/RegionVisitor
+    RegionVisitor visitor = createRegionVisitor(desc);
+
+    // 1. dump region meta info into the snapshot directory
+    LOG.debug("Storing '" + region + "' region-info for snapshot.");
+    Object regionData = visitor.regionOpen(region.getRegionInfo());
+    monitor.rethrowException();
+
+    // 2. iterate through all the stores in the region
+    LOG.debug("Creating references for hfiles");
+
+    for (Store store : region.getStores()) {
+      // 2.1. build the snapshot reference for the store
+      Object familyData = visitor.familyOpen(regionData, store.getFamily().getName());
+      monitor.rethrowException();
+
+      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
+      }
+
+      // 2.2. iterate through all the store's files and create "references".
+      for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
+        StoreFile storeFile = storeFiles.get(i);
+        monitor.rethrowException();
+
+        // create "reference" to this store file.
+        LOG.debug("Adding reference for file (" + (i+1) + "/" + sz + "): " + storeFile.getPath());
+        visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
+      }
+      visitor.familyClose(regionData, familyData);
+    }
+    visitor.regionClose(regionData);
+  }
+
+  /**
+   * Creates a 'manifest' for the specified region, by reading directly from the disk.
+   * This is used by the "offline snapshot" when the table is disabled.
+   */
+  public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException {
+    // 0. Get the ManifestBuilder/RegionVisitor
+    RegionVisitor visitor = createRegionVisitor(desc);
+
+    boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
+    try {
+      // Open the RegionFS
+      RegionStorage regionFs = RegionStorage.open(conf, regionInfo, false);
+      monitor.rethrowException();
+
+      // 1. dump region meta info into the snapshot directory
+      LOG.debug("Storing region-info for snapshot.");
+      Object regionData = visitor.regionOpen(regionInfo);
+      monitor.rethrowException();
+
+      // 2. iterate through all the stores in the region
+      LOG.debug("Creating references for hfiles");
+
+      // This ensures that we have an atomic view of the directory as long as we have < ls limit
+      // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files
+      // in batches and may miss files being added/deleted. This could be more robust (iteratively
+      // checking to see if we have all the files until we are sure), but the limit is currently
+      // 1000 files/batch, far more than the number of store files under a single column family.
+      Collection<String> familyNames = regionFs.getFamilies();
+      if (familyNames != null) {
+        for (String familyName: familyNames) {
+          Object familyData = visitor.familyOpen(regionData, Bytes.toBytes(familyName));
+          monitor.rethrowException();
+
+          Collection<StoreFileInfo> storeFiles = null;
+          if (isMobRegion) {
+            Path regionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
+            Path storePath = MobUtils.getMobFamilyPath(regionPath, familyName);
+            storeFiles = getStoreFiles(storePath);
+          } else {
+            storeFiles = regionFs.getStoreFiles(familyName);
+          }
+
+          if (storeFiles == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("No files under family: " + familyName);
+            }
+            continue;
+          }
+
+          // 2.1. build the snapshot reference for the store
+          // iterate through all the store's files and create "references".
+          addReferenceFiles(visitor, regionData, familyData, storeFiles, false);
+
+          visitor.familyClose(regionData, familyData);
+        }
+      }
+      visitor.regionClose(regionData);
+    } catch (IOException e) {
+      // the mob directory might not be created yet, so do nothing when it is a mob region
+      if (!isMobRegion) {
+        throw e;
+      }
+    }
+  }
+
+  private List<StoreFileInfo> getStoreFiles(Path storeDir) throws IOException {
+    FileStatus[] stats = FSUtils.listStatus(fs, storeDir);
+    if (stats == null) return null;
+
+    ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(stats.length);
+    for (int i = 0; i < stats.length; ++i) {
+      storeFiles.add(new StoreFileInfo(conf, fs, stats[i]));
+    }
+    return storeFiles;
+  }
+
+  private void addReferenceFiles(RegionVisitor visitor, Object regionData, Object familyData,
+      Collection<StoreFileInfo> storeFiles, boolean isMob) throws IOException {
+    final String fileType = isMob ? "mob file" : "hfile";
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Adding snapshot references for %s %ss", storeFiles, fileType));
+    }
+
+    int i = 0;
+    int sz = storeFiles.size();
+    for (StoreFileInfo storeFile: storeFiles) {
+      monitor.rethrowException();
+
+      LOG.debug(String.format("Adding reference for %s (%d/%d): %s",
+          fileType, ++i, sz, storeFile.getPath()));
+
+      // create "reference" to this store file.
+      visitor.storeFile(regionData, familyData, storeFile);
+    }
+  }
+
+  /**
+   * Load the information in the SnapshotManifest. Called by SnapshotManifest.open()
+   *
+   * If the format is v2 and there is no data-manifest, means that we are loading an
+   * in-progress snapshot. Since we support rolling-upgrades, we loook for v1 and v2
+   * regions format.
+   */
+  private void load() throws IOException {
+    switch (getSnapshotFormat(desc)) {
+      case SnapshotManifestV1.DESCRIPTOR_VERSION: {
+        this.htd = LegacyTableDescriptor.getTableDescriptorFromFs(fs, workingDir);
+        ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
+        try {
+          this.regionManifests =
+            SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+        } finally {
+          tpool.shutdown();
+        }
+        break;
+      }
+      case SnapshotManifestV2.DESCRIPTOR_VERSION: {
+        SnapshotDataManifest dataManifest = readDataManifest();
+        if (dataManifest != null) {
+          htd = ProtobufUtil.convertToHTableDesc(dataManifest.getTableSchema());
+          regionManifests = dataManifest.getRegionManifestsList();
+        } else {
+          // Compatibility, load the v1 regions
+          // This happens only when the snapshot is in-progress and the cache wants to refresh.
+          List<SnapshotRegionManifest> v1Regions, v2Regions;
+          ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
+          try {
+            v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+            v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+          } catch (InvalidProtocolBufferException e) {
+            throw new CorruptedSnapshotException("unable to parse region manifest " +
+                e.getMessage(), e);
+          } finally {
+            tpool.shutdown();
+          }
+          if (v1Regions != null && v2Regions != null) {
+            regionManifests =
+              new ArrayList<SnapshotRegionManifest>(v1Regions.size() + v2Regions.size());
+            regionManifests.addAll(v1Regions);
+            regionManifests.addAll(v2Regions);
+          } else if (v1Regions != null) {
+            regionManifests = v1Regions;
+          } else /* if (v2Regions != null) */ {
+            regionManifests = v2Regions;
+          }
+        }
+        break;
+      }
+      default:
+      throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
+        ProtobufUtil.createSnapshotDesc(desc));
+    }
+  }
+
+  /**
+   * Get the current snapshot working dir
+   */
+  public Path getSnapshotDir() {
+    return this.workingDir;
+  }
+
+  /**
+   * Get the SnapshotDescription
+   */
+  public SnapshotDescription getSnapshotDescription() {
+    return this.desc;
+  }
+
+  /**
+   * Get the table descriptor from the Snapshot
+   */
+  public HTableDescriptor getTableDescriptor() {
+    return this.htd;
+  }
+
+  /**
+   * Get all the Region Manifest from the snapshot
+   */
+  public List<SnapshotRegionManifest> getRegionManifests() {
+    return this.regionManifests;
+  }
+
+  /**
+   * Get all the Region Manifest from the snapshot.
+   * This is an helper to get a map with the region encoded name
+   */
+  public Map<String, SnapshotRegionManifest> getRegionManifestsMap() {
+    if (regionManifests == null || regionManifests.size() == 0) return null;
+
+    HashMap<String, SnapshotRegionManifest> regionsMap =
+        new HashMap<String, SnapshotRegionManifest>(regionManifests.size());
+    for (SnapshotRegionManifest manifest: regionManifests) {
+      String regionName = getRegionNameFromManifest(manifest);
+      regionsMap.put(regionName, manifest);
+    }
+    return regionsMap;
+  }
+
+  public void consolidate() throws IOException {
+    if (getSnapshotFormat(desc) == SnapshotManifestV1.DESCRIPTOR_VERSION) {
+      Path rootDir = FSUtils.getRootDir(conf);
+      LOG.info("Using old Snapshot Format");
+      // write a copy of descriptor to the snapshot directory
+      LegacyTableDescriptor.createTableDescriptor(fs, workingDir, htd, false);
+    } else {
+      LOG.debug("Convert to Single Snapshot Manifest");
+      convertToV2SingleManifest();
+    }
+  }
+
+  /*
+   * In case of rolling-upgrade, we try to read all the formats and build
+   * the snapshot with the latest format.
+   */
+  private void convertToV2SingleManifest() throws IOException {
+    // Try to load v1 and v2 regions
+    List<SnapshotRegionManifest> v1Regions, v2Regions;
+    ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
+    try {
+      v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+      v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+    } finally {
+      tpool.shutdown();
+    }
+
+    SnapshotDataManifest.Builder dataManifestBuilder = SnapshotDataManifest.newBuilder();
+    dataManifestBuilder.setTableSchema(ProtobufUtil.convertToTableSchema(htd));
+
+    if (v1Regions != null && v1Regions.size() > 0) {
+      dataManifestBuilder.addAllRegionManifests(v1Regions);
+    }
+    if (v2Regions != null && v2Regions.size() > 0) {
+      dataManifestBuilder.addAllRegionManifests(v2Regions);
+    }
+
+    // Write the v2 Data Manifest.
+    // Once the data-manifest is written, the snapshot can be considered complete.
+    // Currently snapshots are written in a "temporary" directory and later
+    // moved to the "complated" snapshot directory.
+    SnapshotDataManifest dataManifest = dataManifestBuilder.build();
+    writeDataManifest(dataManifest);
+    this.regionManifests = dataManifest.getRegionManifestsList();
+
+    // Remove the region manifests. Everything is now in the data-manifest.
+    // The delete operation is "relaxed", unless we get an exception we keep going.
+    // The extra files in the snapshot directory will not give any problem,
+    // since they have the same content as the data manifest, and even by re-reading
+    // them we will get the same information.
+    if (v1Regions != null && v1Regions.size() > 0) {
+      for (SnapshotRegionManifest regionManifest: v1Regions) {
+        SnapshotManifestV1.deleteRegionManifest(fs, workingDir, regionManifest);
+      }
+    }
+    if (v2Regions != null && v2Regions.size() > 0) {
+      for (SnapshotRegionManifest regionManifest: v2Regions) {
+        SnapshotManifestV2.deleteRegionManifest(fs, workingDir, regionManifest);
+      }
+    }
+  }
+
+  /*
+   * Write the SnapshotDataManifest file
+   */
+  private void writeDataManifest(final SnapshotDataManifest manifest)
+      throws IOException {
+    FSDataOutputStream stream = fs.create(new Path(workingDir, DATA_MANIFEST_NAME));
+    try {
+      manifest.writeTo(stream);
+    } finally {
+      stream.close();
+    }
+  }
+
+  /*
+   * Read the SnapshotDataManifest file
+   */
+  private SnapshotDataManifest readDataManifest() throws IOException {
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(new Path(workingDir, DATA_MANIFEST_NAME));
+      CodedInputStream cin = CodedInputStream.newInstance(in);
+      cin.setSizeLimit(manifestSizeLimit);
+      return SnapshotDataManifest.parseFrom(cin);
+    } catch (FileNotFoundException e) {
+      return null;
+    } catch (InvalidProtocolBufferException e) {
+      throw new CorruptedSnapshotException("unable to parse data manifest " + e.getMessage(), e);
+    } finally {
+      if (in != null) in.close();
+    }
+  }
+
+  private ThreadPoolExecutor createExecutor(final String name) {
+    return createExecutor(conf, name);
+  }
+
+  public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
+    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
+    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+              Threads.getNamedThreadFactory(name));
+  }
+
+  /**
+   * Extract the region encoded name from the region manifest
+   */
+  static String getRegionNameFromManifest(final SnapshotRegionManifest manifest) {
+    byte[] regionName = HRegionInfo.createRegionName(
+            ProtobufUtil.toTableName(manifest.getRegionInfo().getTableName()),
+            manifest.getRegionInfo().getStartKey().toByteArray(),
+            manifest.getRegionInfo().getRegionId(), true);
+    return HRegionInfo.encodeRegionName(regionName);
+  }
+
+  /*
+   * Return the snapshot format
+   */
+  private static int getSnapshotFormat(final SnapshotDescription desc) {
+    return desc.hasVersion() ? desc.getVersion() : SnapshotManifestV1.DESCRIPTOR_VERSION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV1.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV1.java
new file mode 100644
index 0000000..88dea70
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV1.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy.snapshot;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * DO NOT USE DIRECTLY. USE {@link SnapshotManifest}.
+ *
+ * Snapshot v1 layout format
+ *  - Each region in the table is represented by a directory with the .hregioninfo file
+ *      /snapshotName/regionName/.hregioninfo
+ *  - Each file present in the table is represented by an empty file
+ *      /snapshotName/regionName/familyName/fileName
+ */
+@InterfaceAudience.Private
+public final class SnapshotManifestV1 {
+  private static final Log LOG = LogFactory.getLog(SnapshotManifestV1.class);
+
+  public static final int DESCRIPTOR_VERSION = 0;
+
+  private SnapshotManifestV1() {
+  }
+
+  // TODO update for RegionStorage
+  static class ManifestBuilder implements SnapshotManifest.RegionVisitor<RegionStorage, Path> {
+    private final Configuration conf;
+    private final StorageIdentifier snapshotDir;
+    private final FileSystem fs;
+
+    public ManifestBuilder(final Configuration conf, final FileSystem fs, final Path snapshotDir) {
+      this.snapshotDir = new LegacyPathIdentifier(snapshotDir);
+      this.conf = conf;
+      this.fs = fs;
+    }
+
+    public RegionStorage regionOpen(final HRegionInfo regionInfo) throws IOException {
+      RegionStorage snapshotRegionFs = RegionStorage.open(conf, fs,
+          snapshotDir, regionInfo, true);
+      return snapshotRegionFs;
+    }
+
+    public void regionClose(final RegionStorage region) {
+    }
+
+    public Path familyOpen(final RegionStorage snapshotRegionFs, final byte[] familyName) {
+      Path familyDir = ((LegacyPathIdentifier)snapshotRegionFs.getStoreContainer(Bytes.toString(familyName))).path;
+      return familyDir;
+    }
+
+    public void familyClose(final RegionStorage region, final Path family) {
+    }
+
+    public void storeFile(final RegionStorage region, final Path familyDir,
+        final StoreFileInfo storeFile) throws IOException {
+      Path referenceFile = new Path(familyDir, storeFile.getPath().getName());
+      boolean success = true;
+      if (storeFile.isReference()) {
+        // write the Reference object to the snapshot
+        storeFile.getReference().write(fs, referenceFile);
+      } else {
+        // create "reference" to this store file.  It is intentionally an empty file -- all
+        // necessary information is captured by its fs location and filename.  This allows us to
+        // only figure out what needs to be done via a single nn operation (instead of having to
+        // open and read the files as well).
+        success = fs.createNewFile(referenceFile);
+      }
+      if (!success) {
+        throw new IOException("Failed to create reference file:" + referenceFile);
+      }
+    }
+  }
+
+  static List<SnapshotRegionManifest> loadRegionManifests(final Configuration conf,
+      final Executor executor,final FileSystem fs, final Path snapshotDir,
+      final SnapshotDescription desc) throws IOException {
+    FileStatus[] regions = FSUtils.listStatus(fs, snapshotDir, new FSUtils.RegionDirFilter(fs));
+    if (regions == null) {
+      LOG.debug("No regions under directory:" + snapshotDir);
+      return null;
+    }
+
+    final ExecutorCompletionService<SnapshotRegionManifest> completionService =
+      new ExecutorCompletionService<SnapshotRegionManifest>(executor);
+    for (final FileStatus region: regions) {
+      completionService.submit(new Callable<SnapshotRegionManifest>() {
+        @Override
+        public SnapshotRegionManifest call() throws IOException {
+          final RegionStorage rs = RegionStorage.open(conf, new LegacyPathIdentifier(region.getPath()), true);
+          return buildManifestFromDisk(conf, fs, snapshotDir, rs);
+        }
+      });
+    }
+
+    ArrayList<SnapshotRegionManifest> regionsManifest =
+        new ArrayList<SnapshotRegionManifest>(regions.length);
+    try {
+      for (int i = 0; i < regions.length; ++i) {
+        regionsManifest.add(completionService.take().get());
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      IOException ex = new IOException();
+      ex.initCause(e.getCause());
+      throw ex;
+    }
+    return regionsManifest;
+  }
+
+  static void deleteRegionManifest(final FileSystem fs, final Path snapshotDir,
+      final SnapshotRegionManifest manifest) throws IOException {
+    String regionName = SnapshotManifest.getRegionNameFromManifest(manifest);
+    fs.delete(new Path(snapshotDir, regionName), true);
+  }
+
+  static SnapshotRegionManifest buildManifestFromDisk(final Configuration conf,
+      final FileSystem fs, final Path tableDir, final RegionStorage regionFs) throws IOException {
+    SnapshotRegionManifest.Builder manifest = SnapshotRegionManifest.newBuilder();
+
+    // 1. dump region meta info into the snapshot directory
+    LOG.debug("Storing region-info for snapshot.");
+    manifest.setRegionInfo(HRegionInfo.convert(regionFs.getRegionInfo()));
+
+    // 2. iterate through all the stores in the region
+    LOG.debug("Creating references for hfiles");
+
+    // This ensures that we have an atomic view of the directory as long as we have < ls limit
+    // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
+    // batches and may miss files being added/deleted. This could be more robust (iteratively
+    // checking to see if we have all the files until we are sure), but the limit is currently 1000
+    // files/batch, far more than the number of store files under a single column family.
+    Collection<String> familyNames = regionFs.getFamilies();
+    if (familyNames != null) {
+      for (String familyName: familyNames) {
+        Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName, false);
+        if (storeFiles == null) {
+          LOG.debug("No files under family: " + familyName);
+          continue;
+        }
+
+        // 2.1. build the snapshot reference for the store
+        SnapshotRegionManifest.FamilyFiles.Builder family =
+              SnapshotRegionManifest.FamilyFiles.newBuilder();
+        family.setFamilyName(ByteStringer.wrap(Bytes.toBytes(familyName)));
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
+        }
+
+        // 2.2. iterate through all the store's files and create "references".
+        int i = 0;
+        int sz = storeFiles.size();
+        for (StoreFileInfo storeFile: storeFiles) {
+          // create "reference" to this store file.
+          LOG.debug("Adding reference for file ("+ (++i) +"/" + sz + "): " + storeFile.getPath());
+          SnapshotRegionManifest.StoreFile.Builder sfManifest =
+                SnapshotRegionManifest.StoreFile.newBuilder();
+          sfManifest.setName(storeFile.getPath().getName());
+          family.addStoreFiles(sfManifest.build());
+        }
+        manifest.addFamilyFiles(family.build());
+      }
+    }
+    return manifest.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV2.java
new file mode 100644
index 0000000..dba70c0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/snapshot/SnapshotManifestV2.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy.snapshot;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * DO NOT USE DIRECTLY. USE {@link SnapshotManifest}.
+ *
+ * Snapshot v2 layout format
+ *  - Single Manifest file containing all the information of regions
+ *  - In the online-snapshot case each region will write a "region manifest"
+ *      /snapshotName/manifest.regionName
+ */
+@InterfaceAudience.Private
+public final class SnapshotManifestV2 {
+  private static final Log LOG = LogFactory.getLog(SnapshotManifestV2.class);
+
+  public static final int DESCRIPTOR_VERSION = 2;
+
+  public static final String SNAPSHOT_MANIFEST_PREFIX = "region-manifest.";
+
+  private SnapshotManifestV2() {}
+
+  static class ManifestBuilder implements SnapshotManifest.RegionVisitor<
+                    SnapshotRegionManifest.Builder, SnapshotRegionManifest.FamilyFiles.Builder> {
+    private final Configuration conf;
+    private final Path snapshotDir;
+    private final FileSystem fs;
+
+    public ManifestBuilder(final Configuration conf, final FileSystem fs, final Path snapshotDir) {
+      this.snapshotDir = snapshotDir;
+      this.conf = conf;
+      this.fs = fs;
+    }
+
+    public SnapshotRegionManifest.Builder regionOpen(final HRegionInfo regionInfo) {
+      SnapshotRegionManifest.Builder manifest = SnapshotRegionManifest.newBuilder();
+      manifest.setRegionInfo(HRegionInfo.convert(regionInfo));
+      return manifest;
+    }
+
+    public void regionClose(final SnapshotRegionManifest.Builder region) throws IOException {
+      // we should ensure the snapshot dir exist, maybe it has been deleted by master
+      // see HBASE-16464
+      if (fs.exists(snapshotDir)) {
+        SnapshotRegionManifest manifest = region.build();
+        FSDataOutputStream stream = fs.create(getRegionManifestPath(snapshotDir, manifest));
+        try {
+          manifest.writeTo(stream);
+        } finally {
+          stream.close();
+        }
+      } else {
+        LOG.warn("can't write manifest without parent dir, maybe it has been deleted by master?");
+      }
+    }
+
+    public SnapshotRegionManifest.FamilyFiles.Builder familyOpen(
+        final SnapshotRegionManifest.Builder region, final byte[] familyName) {
+      SnapshotRegionManifest.FamilyFiles.Builder family =
+          SnapshotRegionManifest.FamilyFiles.newBuilder();
+      family.setFamilyName(ByteStringer.wrap(familyName));
+      return family;
+    }
+
+    public void familyClose(final SnapshotRegionManifest.Builder region,
+        final SnapshotRegionManifest.FamilyFiles.Builder family) {
+      region.addFamilyFiles(family.build());
+    }
+
+    public void storeFile(final SnapshotRegionManifest.Builder region,
+        final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile)
+        throws IOException {
+      SnapshotRegionManifest.StoreFile.Builder sfManifest =
+            SnapshotRegionManifest.StoreFile.newBuilder();
+      sfManifest.setName(storeFile.getPath().getName());
+      if (storeFile.isReference()) {
+        sfManifest.setReference(storeFile.getReference().convert());
+      }
+      sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen());
+      family.addStoreFiles(sfManifest.build());
+    }
+  }
+
+  static List<SnapshotRegionManifest> loadRegionManifests(final Configuration conf,
+      final Executor executor,final FileSystem fs, final Path snapshotDir,
+      final SnapshotDescription desc) throws IOException {
+    FileStatus[] manifestFiles = FSUtils.listStatus(fs, snapshotDir, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith(SNAPSHOT_MANIFEST_PREFIX);
+      }
+    });
+
+    if (manifestFiles == null || manifestFiles.length == 0) return null;
+
+    final ExecutorCompletionService<SnapshotRegionManifest> completionService =
+      new ExecutorCompletionService<SnapshotRegionManifest>(executor);
+    for (final FileStatus st: manifestFiles) {
+      completionService.submit(new Callable<SnapshotRegionManifest>() {
+        @Override
+        public SnapshotRegionManifest call() throws IOException {
+          FSDataInputStream stream = fs.open(st.getPath());
+          try {
+            return SnapshotRegionManifest.parseFrom(stream);
+          } finally {
+            stream.close();
+          }
+        }
+      });
+    }
+
+    ArrayList<SnapshotRegionManifest> regionsManifest =
+        new ArrayList<SnapshotRegionManifest>(manifestFiles.length);
+    try {
+      for (int i = 0; i < manifestFiles.length; ++i) {
+        regionsManifest.add(completionService.take().get());
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      Throwable t = e.getCause();
+
+      if(t instanceof InvalidProtocolBufferException) {
+        throw (InvalidProtocolBufferException)t;
+      } else {
+        IOException ex = new IOException("ExecutionException");
+        ex.initCause(e.getCause());
+        throw ex;
+      }
+    }
+    return regionsManifest;
+  }
+
+  static void deleteRegionManifest(final FileSystem fs, final Path snapshotDir,
+      final SnapshotRegionManifest manifest) throws IOException {
+    fs.delete(getRegionManifestPath(snapshotDir, manifest), true);
+  }
+
+  private static Path getRegionManifestPath(final Path snapshotDir,
+      final SnapshotRegionManifest manifest) {
+    String regionName = SnapshotManifest.getRegionNameFromManifest(manifest);
+    return new Path(snapshotDir, SNAPSHOT_MANIFEST_PREFIX + regionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index 9737b55..ec0427a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
-import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.fs.legacy.snapshot.ExportSnapshot;
 import org.apache.hadoop.util.ProgramDriver;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
index 5c46f2a..95f7780 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
@@ -23,16 +23,15 @@ import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.ConfigurationUtil;
-import org.apache.hadoop.hbase.util.FSUtils;
 
 import java.io.IOException;
 import java.util.AbstractMap;
@@ -70,14 +69,11 @@ public class MultiTableSnapshotInputFormatImpl {
    */
   public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
       Path restoreDir) throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
-
     setSnapshotToScans(conf, snapshotScans);
     Map<String, Path> restoreDirs =
         generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
     setSnapshotDirs(conf, restoreDirs);
-    restoreSnapshots(conf, restoreDirs, fs);
+    restoreSnapshots(MasterStorage.open(conf, false), restoreDirs);
   }
 
   /**
@@ -91,8 +87,7 @@ public class MultiTableSnapshotInputFormatImpl {
    */
   public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
       throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
+    MasterStorage<? extends StorageIdentifier> masterStorage = MasterStorage.open(conf, false);
 
     List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
 
@@ -103,14 +98,13 @@ public class MultiTableSnapshotInputFormatImpl {
 
       Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
 
-      SnapshotManifest manifest =
-          TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
-      List<HRegionInfo> regionInfos =
-          TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
+      HBaseProtos.SnapshotDescription snapshot = masterStorage.getSnapshot(snapshotName);
 
       for (Scan scan : entry.getValue()) {
         List<TableSnapshotInputFormatImpl.InputSplit> splits =
-            TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
+            TableSnapshotInputFormatImpl.getSplits(scan,
+                masterStorage.getTableDescriptorForSnapshot(snapshot),
+                masterStorage.getSnapshotRegions(snapshot).values(), restoreDir, conf);
         rtn.addAll(splits);
       }
     }
@@ -225,28 +219,25 @@ public class MultiTableSnapshotInputFormatImpl {
   /**
    * Restore each (snapshot name, restore directory) pair in snapshotToDir
    *
-   * @param conf          configuration to restore with
+   * @param masterStorage {@link MasterStorage} to use
    * @param snapshotToDir mapping from snapshot names to restore directories
-   * @param fs            filesystem to do snapshot restoration on
    * @throws IOException
    */
-  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
-      throws IOException {
+  public void restoreSnapshots(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      Map<String, Path> snapshotToDir) throws IOException {
     // TODO: restore from record readers to parallelize.
-    Path rootDir = FSUtils.getRootDir(conf);
-
     for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
       String snapshotName = entry.getKey();
       Path restoreDir = entry.getValue();
       LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
           + " for MultiTableSnapshotInputFormat");
-      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
+      restoreSnapshot(masterStorage, snapshotName, restoreDir);
     }
   }
 
-  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
-      FileSystem fs) throws IOException {
-    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+  void restoreSnapshot(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      String snapshotName, Path restoreDir) throws IOException {
+    RestoreSnapshotHelper.copySnapshotForScanner(masterStorage, restoreDir, snapshotName);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index c40396f..61c1a1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -49,7 +49,7 @@ import java.util.List;
  * wals, etc) directly to provide maximum performance. The snapshot is not required to be
  * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
  * online or offline hbase cluster. The snapshot files can be exported by using the
- * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, 
+ * {@link org.apache.hadoop.hbase.fs.legacy.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
  * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. 
  * The snapshot should not be deleted while there are jobs reading from snapshot files.
  * <p>

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index 7ddde5b..a45f591 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@@ -35,15 +34,16 @@ import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.fs.legacy.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.fs.legacy.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.Writable;
@@ -53,6 +53,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
@@ -198,7 +199,7 @@ public class TableSnapshotInputFormatImpl {
       this.split = split;
       HTableDescriptor htd = split.htd;
       HRegionInfo hri = this.split.getRegionInfo();
-      FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+      MasterStorage<? extends StorageIdentifier> masterStorage = MasterStorage.open(conf, false);
 
 
       // region is immutable, this should be fine,
@@ -207,8 +208,8 @@ public class TableSnapshotInputFormatImpl {
       // disable caching of data blocks
       scan.setCacheBlocks(false);
 
-      scanner =
-          new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
+      scanner = new ClientSideRegionScanner(masterStorage, new Path(split.restoreDir), htd, hri,
+          scan, null);
     }
 
     public boolean nextKeyValue() throws IOException {
@@ -251,19 +252,16 @@ public class TableSnapshotInputFormatImpl {
   public static List<InputSplit> getSplits(Configuration conf) throws IOException {
     String snapshotName = getSnapshotName(conf);
 
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
-
-    List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
+    MasterStorage<? extends StorageIdentifier> masterStorage = MasterStorage.open(conf, false);
+    SnapshotDescription snapshot = masterStorage.getSnapshot(snapshotName);
 
     // TODO: mapred does not support scan as input API. Work around for now.
     Scan scan = extractScanFromConf(conf);
     // the temp dir where the snapshot is restored
     Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
 
-    return getSplits(scan, manifest, regionInfos, restoreDir, conf);
+    return getSplits(scan, masterStorage.getTableDescriptorForSnapshot(snapshot),
+        masterStorage.getSnapshotRegions(snapshot).values(), restoreDir, conf);
   }
 
   public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
@@ -280,13 +278,6 @@ public class TableSnapshotInputFormatImpl {
     return regionInfos;
   }
 
-  public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
-      Path rootDir, FileSystem fs) throws IOException {
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
-  }
-
   public static Scan extractScanFromConf(Configuration conf) throws IOException {
     Scan scan = null;
     if (conf.get(TableInputFormat.SCAN) != null) {
@@ -304,15 +295,12 @@ public class TableSnapshotInputFormatImpl {
     return scan;
   }
 
-  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
-      List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
-    // load table descriptor
-    HTableDescriptor htd = manifest.getTableDescriptor();
-
+  public static List<InputSplit> getSplits(Scan scan, HTableDescriptor htd,
+      Collection<HRegionInfo> regionInfos, Path restoreDir, Configuration conf) throws IOException {
     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
 
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (HRegionInfo hri : regionManifests) {
+    for (HRegionInfo hri : regionInfos) {
       // load region descriptor
 
       if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
@@ -395,13 +383,12 @@ public class TableSnapshotInputFormatImpl {
       throws IOException {
     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
 
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
+    MasterStorage<? extends StorageIdentifier> masterStorage = MasterStorage.open(conf, false);
 
     restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
 
     // TODO: restore from record readers to parallelize.
-    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+    RestoreSnapshotHelper.copySnapshotForScanner(masterStorage, restoreDir, snapshotName);
 
     conf.set(RESTORE_DIR_KEY, restoreDir.toString());
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
index e8e75c8..cbdc02f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MetricsSnapshot;
 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.CreateStorageRegions;
@@ -48,12 +49,11 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CloneSnapshotState;
+import org.apache.hadoop.hbase.snapshot.SnapshotRestoreMetaChanges;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 
 import com.google.common.base.Preconditions;
 
@@ -301,12 +301,10 @@ public class CloneSnapshotProcedure
       throws IOException, InterruptedException {
     if (!getTableName().isSystemTable()) {
       // Check and update namespace quota
-      final MasterStorage ms = env.getMasterServices().getMasterStorage();
+      final MasterStorage masterStorage = env.getMasterServices().getMasterStorage();
 
-      SnapshotManifest manifest = SnapshotManifest.open(env.getMasterConfiguration(), snapshot);
-
-      ProcedureSyncWait.getMasterQuotaManager(env)
-        .checkNamespaceTableAndRegionQuota(getTableName(), manifest.getRegionManifestsMap().size());
+      ProcedureSyncWait.getMasterQuotaManager(env).checkNamespaceTableAndRegionQuota(getTableName(),
+          masterStorage.getSnapshotRegions(snapshot).size());
     }
 
     final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
@@ -354,11 +352,10 @@ public class CloneSnapshotProcedure
 
         try {
           // 1. Execute the on-disk Clone
-          SnapshotManifest manifest = SnapshotManifest.open(conf, snapshot);
-          RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(conf, manifest,
+          MasterStorage<? extends StorageIdentifier> masterStorage =
+              env.getMasterServices().getMasterStorage();
+          SnapshotRestoreMetaChanges metaChanges = masterStorage.restoreSnapshot(snapshot,
               hTableDescriptor, monitorException, monitorStatus);
-          RestoreSnapshotHelper.RestoreMetaChanges metaChanges =
-              restoreHelper.restoreStorageRegions();
 
           // Clone operation should not have stuff to restore or remove
           Preconditions.checkArgument(
@@ -399,15 +396,15 @@ public class CloneSnapshotProcedure
     final HTableDescriptor hTableDescriptor,
     List<HRegionInfo> newRegions,
     final CreateStorageRegions storageRegionHandler) throws IOException {
-    final MasterStorage ms = env.getMasterServices().getMasterStorage();
+    final MasterStorage masterStorage = env.getMasterServices().getMasterStorage();
 
-    // 1. Delete existing artifacts (dir, files etc) for the table
-    ms.deleteTable(hTableDescriptor.getTableName());
+    // 1. Delete existing storage artifacts (dir, files etc) for the table
+    masterStorage.deleteTable(hTableDescriptor.getTableName());
 
     // 2. Create Table Descriptor
     // using a copy of descriptor, table will be created enabling first
     HTableDescriptor underConstruction = new HTableDescriptor(hTableDescriptor);
-    ms.createTableDescriptor(underConstruction, true);
+    masterStorage.createTableDescriptor(underConstruction, true);
 
     // 3. Create Regions
     newRegions = storageRegionHandler.createRegionsOnStorage(env, hTableDescriptor.getTableName(),
@@ -424,9 +421,8 @@ public class CloneSnapshotProcedure
   private void addRegionsToMeta(final MasterProcedureEnv env) throws IOException {
     newRegions = CreateTableProcedure.addTableToMeta(env, hTableDescriptor, newRegions);
 
-    RestoreSnapshotHelper.RestoreMetaChanges metaChanges =
-        new RestoreSnapshotHelper.RestoreMetaChanges(
-          hTableDescriptor, parentsToChildrenPairMap);
+    SnapshotRestoreMetaChanges metaChanges =
+        new SnapshotRestoreMetaChanges(hTableDescriptor, parentsToChildrenPairMap);
     metaChanges.updateMetaParentRegions(env.getMasterServices().getConnection(), newRegions);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
index fdfa174..a339119 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
@@ -51,8 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.RestoreSnapshotState;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.snapshot.SnapshotRestoreMetaChanges;
 import org.apache.hadoop.hbase.util.Pair;
 
 @InterfaceAudience.Private
@@ -331,8 +330,9 @@ public class RestoreSnapshotProcedure
 
     if (!getTableName().isSystemTable()) {
       // Table already exist. Check and update the region quota for this table namespace.
-      SnapshotManifest manifest = SnapshotManifest.open(env.getMasterConfiguration(), snapshot);
-      int snapshotRegionCount = manifest.getRegionManifestsMap().size();
+      MasterStorage<? extends StorageIdentifier> masterStorage =
+          env.getMasterServices().getMasterStorage();
+      int snapshotRegionCount = masterStorage.getSnapshotRegions(snapshot).size();
       int tableRegionCount =
           ProcedureSyncWait.getMasterQuotaManager(env).getRegionCountOfTable(tableName);
 
@@ -362,13 +362,10 @@ public class RestoreSnapshotProcedure
 
     LOG.info("Starting restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot));
     try {
-      SnapshotManifest manifest = SnapshotManifest.open(env.getMasterServices().getConfiguration(),
-          snapshot);
-      RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(
-          env.getMasterServices().getConfiguration(), manifest, modifiedHTableDescriptor,
-          monitorException, getMonitorStatus());
-
-      RestoreSnapshotHelper.RestoreMetaChanges metaChanges = restoreHelper.restoreStorageRegions();
+      MasterStorage<? extends StorageIdentifier> masterStorage =
+          env.getMasterServices().getMasterStorage();
+      SnapshotRestoreMetaChanges metaChanges = masterStorage.restoreSnapshot(snapshot,
+          modifiedHTableDescriptor, monitorException, getMonitorStatus());
       regionsToRestore = metaChanges.getRegionsToRestore();
       regionsToRemove = metaChanges.getRegionsToRemove();
       regionsToAdd = metaChanges.getRegionsToAdd();
@@ -437,9 +434,8 @@ public class RestoreSnapshotProcedure
           modifiedHTableDescriptor.getRegionReplication());
       }
 
-      RestoreSnapshotHelper.RestoreMetaChanges metaChanges =
-        new RestoreSnapshotHelper.RestoreMetaChanges(
-          modifiedHTableDescriptor, parentsToChildrenPairMap);
+      SnapshotRestoreMetaChanges metaChanges =
+        new SnapshotRestoreMetaChanges(modifiedHTableDescriptor, parentsToChildrenPairMap);
       metaChanges.updateMetaParentRegions(conn, regionsToAdd);
 
       // At this point the restore is complete.

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
index a7c2652..ec3e56d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
@@ -32,13 +32,13 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.fs.StorageContext;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
@@ -96,17 +96,7 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler {
       LOG.info(msg);
       status.setStatus(msg);
 
-      ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot");
-      try {
-        ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
-          @Override
-          public void editRegion(final HRegionInfo regionInfo) throws IOException {
-            snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo);
-          }
-        });
-      } finally {
-        exec.shutdown();
-      }
+      masterStorage.addRegionsToSnapshot(snapshot, regions, StorageContext.TEMP);
     } catch (Exception e) {
       // make sure we capture the exception to propagate back to the client later
       String reason = "Failed snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot)

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
index 6e14f47..4dfd7d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
@@ -20,38 +20,35 @@ package org.apache.hadoop.hbase.master.snapshot;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageContext;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 
 /**
  * General snapshot verification on the master.
  * <p>
- * This is a light-weight verification mechanism for all the files in a snapshot. It doesn't
- * attempt to verify that the files are exact copies (that would be paramount to taking the
- * snapshot again!), but instead just attempts to ensure that the files match the expected
- * files and are the same length.
+ * This is a light-weight verification mechanism for verifying snapshot artifacts like snapshot
+ * description, table, regions and store files. It doesn't attempt to verify that the artifacts
+ * are exact copies (that would be paramount to taking the snapshot again!), but instead just
+ * attempts to ensure that the artifacts match the expected artifacts including the length etc.
  * <p>
  * Taking an online snapshots can race against other operations and this is an last line of
  * defense.  For example, if meta changes between when snapshots are taken not all regions of a
@@ -59,7 +56,7 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
  * but snapshot took parent), or move (snapshots only checks lists of region servers, a move could
  * have caused a region to be skipped or done twice).
  * <p>
- * Current snapshot files checked:
+ * Current snapshot artifacts checked:
  * <ol>
  * <li>SnapshotDescription is readable</li>
  * <li>Table info is readable</li>
@@ -79,53 +76,57 @@ public final class MasterSnapshotVerifier {
   private static final Log LOG = LogFactory.getLog(MasterSnapshotVerifier.class);
 
   private SnapshotDescription snapshot;
-  private FileSystem fs;
-  private Path rootDir;
   private TableName tableName;
+  private MasterStorage<? extends StorageIdentifier> masterStorage;
   private MasterServices services;
+  private StorageContext ctx;
 
   /**
    * @param services services for the master
    * @param snapshot snapshot to check
-   * @param rootDir root directory of the hbase installation.
    */
-  public MasterSnapshotVerifier(MasterServices services, SnapshotDescription snapshot, Path rootDir) {
-    this.fs = services.getMasterStorage().getFileSystem();
+  public MasterSnapshotVerifier(MasterServices services, SnapshotDescription snapshot,
+      StorageContext ctx) {
     this.services = services;
+    this.masterStorage = services.getMasterStorage();
     this.snapshot = snapshot;
-    this.rootDir = rootDir;
+    this.ctx = ctx;
     this.tableName = TableName.valueOf(snapshot.getTable());
   }
 
   /**
-   * Verify that the snapshot in the directory is a valid snapshot
-   * @param snapshotDir snapshot directory to check
-   * @param snapshotServers {@link org.apache.hadoop.hbase.ServerName} of the servers 
-   *        that are involved in the snapshot
+   * Verify that the snapshot persisted on a storage is a valid snapshot
+   * @param ctx {@link StorageContext} for a given snapshot
    * @throws CorruptedSnapshotException if the snapshot is invalid
-   * @throws IOException if there is an unexpected connection issue to the filesystem
+   * @throws IOException if there is an unexpected connection issue to the storage
    */
-  public void verifySnapshot(Path snapshotDir, Set<String> snapshotServers)
+  public void verifySnapshot(StorageContext ctx)
       throws CorruptedSnapshotException, IOException {
-    SnapshotManifest manifest = SnapshotManifest.open(services.getConfiguration(), fs,
-                                                      snapshotDir, snapshot);
     // verify snapshot info matches
-    verifySnapshotDescription(snapshotDir);
+    verifySnapshotDescription(ctx);
 
-    // check that tableinfo is a valid table description
-    verifyTableInfo(manifest);
+    // check that table info is a valid table description
+    verifyTableInfo(ctx);
 
     // check that each region is valid
-    verifyRegions(manifest);
+    verifyRegions(ctx);
   }
 
   /**
-   * Check that the snapshot description written in the filesystem matches the current snapshot
-   * @param snapshotDir snapshot directory to check
+   * Check that the snapshot description written to storage matches the current snapshot
+   * @param ctx {@link StorageContext} for a given snapshot
+   * @throws CorruptedSnapshotException if verification fails
    */
-  private void verifySnapshotDescription(Path snapshotDir) throws CorruptedSnapshotException {
-    SnapshotDescription found = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    if (!this.snapshot.equals(found)) {
+  private void verifySnapshotDescription(StorageContext ctx) throws CorruptedSnapshotException {
+    boolean match = false;
+    SnapshotDescription found = null;
+    try {
+      found = masterStorage.getSnapshot(snapshot.getName(), ctx);
+      match = this.snapshot.equals(found);
+    } catch (IOException e) {
+      LOG.warn("Failed to read snapshot '" + snapshot.getName() + "' from storage.", e);
+    }
+    if (!match) {
       throw new CorruptedSnapshotException(
           "Snapshot read (" + found + ") doesn't equal snapshot we ran (" + snapshot + ").",
           ProtobufUtil.createSnapshotDesc(snapshot));
@@ -133,11 +134,12 @@ public final class MasterSnapshotVerifier {
   }
 
   /**
-   * Check that the table descriptor for the snapshot is a valid table descriptor
-   * @param manifest snapshot manifest to inspect
+   * Check that the table descriptor written to storage for the snapshot is valid
+   * @param ctx {@link StorageContext} for a given snapshot
+   * @throws IOException if fails to read table descriptor from storage
    */
-  private void verifyTableInfo(final SnapshotManifest manifest) throws IOException {
-    HTableDescriptor htd = manifest.getTableDescriptor();
+  private void verifyTableInfo(StorageContext ctx) throws IOException {
+    HTableDescriptor htd = masterStorage.getTableDescriptorForSnapshot(snapshot, ctx);
     if (htd == null) {
       throw new CorruptedSnapshotException("Missing Table Descriptor",
         ProtobufUtil.createSnapshotDesc(snapshot));
@@ -152,10 +154,10 @@ public final class MasterSnapshotVerifier {
 
   /**
    * Check that all the regions in the snapshot are valid, and accounted for.
-   * @param manifest snapshot manifest to inspect
-   * @throws IOException if we can't reach hbase:meta or read the files from the FS
+   * @param ctx {@link StorageContext} for a given snapshot
+   * @throws IOException if fails to read region info for a snapshot from storage
    */
-  private void verifyRegions(final SnapshotManifest manifest) throws IOException {
+  private void verifyRegions(StorageContext ctx) throws IOException {
     List<HRegionInfo> regions;
     if (TableName.META_TABLE_NAME.equals(tableName)) {
       regions = new MetaTableLocator().getMetaRegions(services.getZooKeeper());
@@ -165,8 +167,8 @@ public final class MasterSnapshotVerifier {
     // Remove the non-default regions
     RegionReplicaUtil.removeNonDefaultRegions(regions);
 
-    Map<String, SnapshotRegionManifest> regionManifests = manifest.getRegionManifestsMap();
-    if (regionManifests == null) {
+    Map<String, HRegionInfo> snapshotRegions = masterStorage.getSnapshotRegions(snapshot, ctx);
+    if (snapshotRegions == null) {
       String msg = "Snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot) + " looks empty";
       LOG.error(msg);
       throw new CorruptedSnapshotException(msg);
@@ -176,29 +178,31 @@ public final class MasterSnapshotVerifier {
     boolean hasMobStore = false;
     // the mob region is a dummy region, it's not a real region in HBase.
     // the mob region has a special name, it could be found by the region name.
-    if (regionManifests.get(MobUtils.getMobRegionInfo(tableName).getEncodedName()) != null) {
+    if (snapshotRegions.get(MobUtils.getMobRegionInfo(tableName).getEncodedName()) != null) {
       hasMobStore = true;
     }
-    int realRegionCount = hasMobStore ? regionManifests.size() - 1 : regionManifests.size();
+    int realRegionCount = hasMobStore ? snapshotRegions.size() - 1 : snapshotRegions.size();
     if (realRegionCount != regions.size()) {
       errorMsg = "Regions moved during the snapshot '" +
                    ClientSnapshotDescriptionUtils.toString(snapshot) + "'. expected=" +
                    regions.size() + " snapshotted=" + realRegionCount + ".";
       LOG.error(errorMsg);
-    }
-
-    // Verify HRegionInfo
-    for (HRegionInfo region : regions) {
-      SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName());
-      if (regionManifest == null) {
-        // could happen due to a move or split race.
-        String mesg = " No snapshot region directory found for region:" + region;
-        if (errorMsg.isEmpty()) errorMsg = mesg;
-        LOG.error(mesg);
-        continue;
+    } else {
+      // Verify HRegionInfo
+      for (HRegionInfo region : regions) {
+        HRegionInfo snapshotRegion = snapshotRegions.get(region.getEncodedName());
+        if (snapshotRegion == null) {
+          // could happen due to a move or split race.
+          errorMsg = "No snapshot region directory found for region '" + region + "'";
+          LOG.error(errorMsg);
+          break;
+        } else if (!region.equals(snapshotRegion)) {
+          errorMsg = "Snapshot region info '" + snapshotRegion + "' doesn't match expected region'"
+              + region + "'.";
+          LOG.error(errorMsg);
+          break;
+        }
       }
-
-      verifyRegionInfo(region, regionManifest);
     }
 
     if (!errorMsg.isEmpty()) {
@@ -206,21 +210,6 @@ public final class MasterSnapshotVerifier {
     }
 
     // Verify Snapshot HFiles
-    SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), fs, manifest);
-  }
-
-  /**
-   * Verify that the regionInfo is valid
-   * @param region the region to check
-   * @param manifest snapshot manifest to inspect
-   */
-  private void verifyRegionInfo(final HRegionInfo region,
-      final SnapshotRegionManifest manifest) throws IOException {
-    HRegionInfo manifestRegionInfo = HRegionInfo.convert(manifest.getRegionInfo());
-    if (!region.equals(manifestRegionInfo)) {
-      String msg = "Manifest region info " + manifestRegionInfo +
-                   "doesn't match expected region:" + region;
-      throw new CorruptedSnapshotException(msg, ProtobufUtil.createSnapshotDesc(snapshot));
-    }
+    SnapshotReferenceUtil.verifySnapshot(services.getMasterStorage(), snapshot, ctx);
   }
 }