You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/11/30 06:56:20 UTC

[2/8] hbase git commit: HBASE-16904 Snapshot related changes for FS redo work

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
deleted file mode 100644
index 572bc04..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
+++ /dev/null
@@ -1,570 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.snapshot;
-
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.fs.RegionStorage;
-import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
-import org.apache.hadoop.hbase.fs.legacy.LegacyTableDescriptor;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
-import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.zookeeper.server.persistence.SnapShot;
-
-/**
- * Utility class to help read/write the Snapshot Manifest.
- *
- * The snapshot format is transparent for the users of this class,
- * once the snapshot is written, it will never be modified.
- * On open() the snapshot will be loaded to the current in-memory format.
- */
-@InterfaceAudience.Private
-public final class SnapshotManifest {
-  private static final Log LOG = LogFactory.getLog(SnapshotManifest.class);
-
-  public static final String SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY = "snapshot.manifest.size.limit";
-
-  public static final String DATA_MANIFEST_NAME = "data.manifest";
-
-  private List<SnapshotRegionManifest> regionManifests;
-  private SnapshotDescription desc;
-  private HTableDescriptor htd;
-
-  private final ForeignExceptionSnare monitor;
-  private final Configuration conf;
-  private final Path workingDir;
-  private final FileSystem fs;
-  private int manifestSizeLimit;
-
-  private SnapshotManifest(final Configuration conf, final FileSystem fs,
-      final Path workingDir, final SnapshotDescription desc,
-      final ForeignExceptionSnare monitor) {
-    this.monitor = monitor;
-    this.desc = desc;
-    this.workingDir = workingDir;
-    this.conf = conf;
-    this.fs = fs;
-
-    this.manifestSizeLimit = conf.getInt(SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY, 64 * 1024 * 1024);
-  }
-
-  /**
-   * Return a SnapshotManifest instance, used for writing a snapshot.
-   *
-   * There are two usage pattern:
-   *  - The Master will create a manifest, add the descriptor, offline regions
-   *    and consolidate the snapshot by writing all the pending stuff on-disk.
-   *      manifest = SnapshotManifest.create(...)
-   *      manifest.addRegion(tableDir, hri)
-   *      manifest.consolidate()
-   *  - The RegionServer will create a single region manifest
-   *      manifest = SnapshotManifest.create(...)
-   *      manifest.addRegion(region)
-   */
-  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
-      final Path workingDir, final SnapshotDescription desc,
-      final ForeignExceptionSnare monitor) {
-    return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
-  }
-
-  /**
-   * Return a SnapshotManifest instance with the information already loaded in-memory.
-   *    SnapshotManifest manifest = SnapshotManifest.open(...)
-   *    HTableDescriptor htd = manifest.getTableDescriptor()
-   *    for (SnapshotRegionManifest regionManifest: manifest.getRegionManifests())
-   *      hri = regionManifest.getRegionInfo()
-   *      for (regionManifest.getFamilyFiles())
-   *        ...
-   */
-  public static SnapshotManifest open(final Configuration conf, final FileSystem fs,
-      final Path workingDir, final SnapshotDescription desc) throws IOException {
-    SnapshotManifest manifest = new SnapshotManifest(conf, fs, workingDir, desc, null);
-    manifest.load();
-    return manifest;
-  }
-
-  public static SnapshotManifest open(final Configuration conf, final SnapshotDescription desc)
-      throws IOException {
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc,
-        FSUtils.getRootDir(conf));
-    return open(conf, snapshotDir.getFileSystem(conf), snapshotDir, desc);
-  }
-
-  /**
-   * Add the table descriptor to the snapshot manifest
-   */
-  public void addTableDescriptor(final HTableDescriptor htd) throws IOException {
-    this.htd = htd;
-  }
-
-  interface RegionVisitor<TRegion, TFamily> {
-    TRegion regionOpen(final HRegionInfo regionInfo) throws IOException;
-    void regionClose(final TRegion region) throws IOException;
-
-    TFamily familyOpen(final TRegion region, final byte[] familyName) throws IOException;
-    void familyClose(final TRegion region, final TFamily family) throws IOException;
-
-    void storeFile(final TRegion region, final TFamily family, final StoreFileInfo storeFile)
-      throws IOException;
-  }
-
-  private RegionVisitor createRegionVisitor(final SnapshotDescription desc) throws IOException {
-    switch (getSnapshotFormat(desc)) {
-      case SnapshotManifestV1.DESCRIPTOR_VERSION:
-        return new SnapshotManifestV1.ManifestBuilder(conf, fs, workingDir);
-      case SnapshotManifestV2.DESCRIPTOR_VERSION:
-        return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir);
-      default:
-      throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
-        ProtobufUtil.createSnapshotDesc(desc));
-    }
-  }
-
-  public void addMobRegion(HRegionInfo regionInfo, HColumnDescriptor[] hcds) throws IOException {
-    // 0. Get the ManifestBuilder/RegionVisitor
-    RegionVisitor visitor = createRegionVisitor(desc);
-
-    // 1. dump region meta info into the snapshot directory
-    LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot.");
-    Object regionData = visitor.regionOpen(regionInfo);
-    monitor.rethrowException();
-
-    // 2. iterate through all the stores in the region
-    LOG.debug("Creating references for mob files");
-
-    Path mobRegionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
-    for (HColumnDescriptor hcd : hcds) {
-      // 2.1. build the snapshot reference for the store if it's a mob store
-      if (!hcd.isMobEnabled()) {
-        continue;
-      }
-      Object familyData = visitor.familyOpen(regionData, hcd.getName());
-      monitor.rethrowException();
-
-      Path storePath = MobUtils.getMobFamilyPath(mobRegionPath, hcd.getNameAsString());
-      List<StoreFileInfo> storeFiles = getStoreFiles(storePath);
-      if (storeFiles == null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("No mob files under family: " + hcd.getNameAsString());
-        }
-        continue;
-      }
-
-      addReferenceFiles(visitor, regionData, familyData, storeFiles, true);
-
-      visitor.familyClose(regionData, familyData);
-    }
-    visitor.regionClose(regionData);
-  }
-
-  /**
-   * Creates a 'manifest' for the specified region, by reading directly from the HRegion object.
-   * This is used by the "online snapshot" when the table is enabled.
-   */
-  public void addRegion(final HRegion region) throws IOException {
-    // 0. Get the ManifestBuilder/RegionVisitor
-    RegionVisitor visitor = createRegionVisitor(desc);
-
-    // 1. dump region meta info into the snapshot directory
-    LOG.debug("Storing '" + region + "' region-info for snapshot.");
-    Object regionData = visitor.regionOpen(region.getRegionInfo());
-    monitor.rethrowException();
-
-    // 2. iterate through all the stores in the region
-    LOG.debug("Creating references for hfiles");
-
-    for (Store store : region.getStores()) {
-      // 2.1. build the snapshot reference for the store
-      Object familyData = visitor.familyOpen(regionData, store.getFamily().getName());
-      monitor.rethrowException();
-
-      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
-      }
-
-      // 2.2. iterate through all the store's files and create "references".
-      for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
-        StoreFile storeFile = storeFiles.get(i);
-        monitor.rethrowException();
-
-        // create "reference" to this store file.
-        LOG.debug("Adding reference for file (" + (i+1) + "/" + sz + "): " + storeFile.getPath());
-        visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
-      }
-      visitor.familyClose(regionData, familyData);
-    }
-    visitor.regionClose(regionData);
-  }
-
-  /**
-   * Creates a 'manifest' for the specified region, by reading directly from the disk.
-   * This is used by the "offline snapshot" when the table is disabled.
-   */
-  public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException {
-    // 0. Get the ManifestBuilder/RegionVisitor
-    RegionVisitor visitor = createRegionVisitor(desc);
-
-    boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
-    try {
-      // Open the RegionFS
-      RegionStorage regionFs = RegionStorage.open(conf, regionInfo, false);
-      monitor.rethrowException();
-
-      // 1. dump region meta info into the snapshot directory
-      LOG.debug("Storing region-info for snapshot.");
-      Object regionData = visitor.regionOpen(regionInfo);
-      monitor.rethrowException();
-
-      // 2. iterate through all the stores in the region
-      LOG.debug("Creating references for hfiles");
-
-      // This ensures that we have an atomic view of the directory as long as we have < ls limit
-      // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files
-      // in batches and may miss files being added/deleted. This could be more robust (iteratively
-      // checking to see if we have all the files until we are sure), but the limit is currently
-      // 1000 files/batch, far more than the number of store files under a single column family.
-      Collection<String> familyNames = regionFs.getFamilies();
-      if (familyNames != null) {
-        for (String familyName: familyNames) {
-          Object familyData = visitor.familyOpen(regionData, Bytes.toBytes(familyName));
-          monitor.rethrowException();
-
-          Collection<StoreFileInfo> storeFiles = null;
-          if (isMobRegion) {
-            Path regionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
-            Path storePath = MobUtils.getMobFamilyPath(regionPath, familyName);
-            storeFiles = getStoreFiles(storePath);
-          } else {
-            storeFiles = regionFs.getStoreFiles(familyName);
-          }
-
-          if (storeFiles == null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("No files under family: " + familyName);
-            }
-            continue;
-          }
-
-          // 2.1. build the snapshot reference for the store
-          // iterate through all the store's files and create "references".
-          addReferenceFiles(visitor, regionData, familyData, storeFiles, false);
-
-          visitor.familyClose(regionData, familyData);
-        }
-      }
-      visitor.regionClose(regionData);
-    } catch (IOException e) {
-      // the mob directory might not be created yet, so do nothing when it is a mob region
-      if (!isMobRegion) {
-        throw e;
-      }
-    }
-  }
-
-  private List<StoreFileInfo> getStoreFiles(Path storeDir) throws IOException {
-    FileStatus[] stats = FSUtils.listStatus(fs, storeDir);
-    if (stats == null) return null;
-
-    ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(stats.length);
-    for (int i = 0; i < stats.length; ++i) {
-      storeFiles.add(new StoreFileInfo(conf, fs, stats[i]));
-    }
-    return storeFiles;
-  }
-
-  private void addReferenceFiles(RegionVisitor visitor, Object regionData, Object familyData,
-      Collection<StoreFileInfo> storeFiles, boolean isMob) throws IOException {
-    final String fileType = isMob ? "mob file" : "hfile";
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Adding snapshot references for %s %ss", storeFiles, fileType));
-    }
-
-    int i = 0;
-    int sz = storeFiles.size();
-    for (StoreFileInfo storeFile: storeFiles) {
-      monitor.rethrowException();
-
-      LOG.debug(String.format("Adding reference for %s (%d/%d): %s",
-          fileType, ++i, sz, storeFile.getPath()));
-
-      // create "reference" to this store file.
-      visitor.storeFile(regionData, familyData, storeFile);
-    }
-  }
-
-  /**
-   * Load the information in the SnapshotManifest. Called by SnapshotManifest.open()
-   *
-   * If the format is v2 and there is no data-manifest, means that we are loading an
-   * in-progress snapshot. Since we support rolling-upgrades, we loook for v1 and v2
-   * regions format.
-   */
-  private void load() throws IOException {
-    switch (getSnapshotFormat(desc)) {
-      case SnapshotManifestV1.DESCRIPTOR_VERSION: {
-        this.htd = LegacyTableDescriptor.getTableDescriptorFromFs(fs, workingDir);
-        ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
-        try {
-          this.regionManifests =
-            SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
-        } finally {
-          tpool.shutdown();
-        }
-        break;
-      }
-      case SnapshotManifestV2.DESCRIPTOR_VERSION: {
-        SnapshotDataManifest dataManifest = readDataManifest();
-        if (dataManifest != null) {
-          htd = ProtobufUtil.convertToHTableDesc(dataManifest.getTableSchema());
-          regionManifests = dataManifest.getRegionManifestsList();
-        } else {
-          // Compatibility, load the v1 regions
-          // This happens only when the snapshot is in-progress and the cache wants to refresh.
-          List<SnapshotRegionManifest> v1Regions, v2Regions;
-          ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
-          try {
-            v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
-            v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
-          } catch (InvalidProtocolBufferException e) {
-            throw new CorruptedSnapshotException("unable to parse region manifest " +
-                e.getMessage(), e);
-          } finally {
-            tpool.shutdown();
-          }
-          if (v1Regions != null && v2Regions != null) {
-            regionManifests =
-              new ArrayList<SnapshotRegionManifest>(v1Regions.size() + v2Regions.size());
-            regionManifests.addAll(v1Regions);
-            regionManifests.addAll(v2Regions);
-          } else if (v1Regions != null) {
-            regionManifests = v1Regions;
-          } else /* if (v2Regions != null) */ {
-            regionManifests = v2Regions;
-          }
-        }
-        break;
-      }
-      default:
-      throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
-        ProtobufUtil.createSnapshotDesc(desc));
-    }
-  }
-
-  /**
-   * Get the current snapshot working dir
-   */
-  public Path getSnapshotDir() {
-    return this.workingDir;
-  }
-
-  /**
-   * Get the SnapshotDescription
-   */
-  public SnapshotDescription getSnapshotDescription() {
-    return this.desc;
-  }
-
-  /**
-   * Get the table descriptor from the Snapshot
-   */
-  public HTableDescriptor getTableDescriptor() {
-    return this.htd;
-  }
-
-  /**
-   * Get all the Region Manifest from the snapshot
-   */
-  public List<SnapshotRegionManifest> getRegionManifests() {
-    return this.regionManifests;
-  }
-
-  /**
-   * Get all the Region Manifest from the snapshot.
-   * This is an helper to get a map with the region encoded name
-   */
-  public Map<String, SnapshotRegionManifest> getRegionManifestsMap() {
-    if (regionManifests == null || regionManifests.size() == 0) return null;
-
-    HashMap<String, SnapshotRegionManifest> regionsMap =
-        new HashMap<String, SnapshotRegionManifest>(regionManifests.size());
-    for (SnapshotRegionManifest manifest: regionManifests) {
-      String regionName = getRegionNameFromManifest(manifest);
-      regionsMap.put(regionName, manifest);
-    }
-    return regionsMap;
-  }
-
-  public void consolidate() throws IOException {
-    if (getSnapshotFormat(desc) == SnapshotManifestV1.DESCRIPTOR_VERSION) {
-      Path rootDir = FSUtils.getRootDir(conf);
-      LOG.info("Using old Snapshot Format");
-      // write a copy of descriptor to the snapshot directory
-      LegacyTableDescriptor.createTableDescriptor(fs, workingDir, htd, false);
-    } else {
-      LOG.debug("Convert to Single Snapshot Manifest");
-      convertToV2SingleManifest();
-    }
-  }
-
-  /*
-   * In case of rolling-upgrade, we try to read all the formats and build
-   * the snapshot with the latest format.
-   */
-  private void convertToV2SingleManifest() throws IOException {
-    // Try to load v1 and v2 regions
-    List<SnapshotRegionManifest> v1Regions, v2Regions;
-    ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
-    try {
-      v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
-      v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
-    } finally {
-      tpool.shutdown();
-    }
-
-    SnapshotDataManifest.Builder dataManifestBuilder = SnapshotDataManifest.newBuilder();
-    dataManifestBuilder.setTableSchema(ProtobufUtil.convertToTableSchema(htd));
-
-    if (v1Regions != null && v1Regions.size() > 0) {
-      dataManifestBuilder.addAllRegionManifests(v1Regions);
-    }
-    if (v2Regions != null && v2Regions.size() > 0) {
-      dataManifestBuilder.addAllRegionManifests(v2Regions);
-    }
-
-    // Write the v2 Data Manifest.
-    // Once the data-manifest is written, the snapshot can be considered complete.
-    // Currently snapshots are written in a "temporary" directory and later
-    // moved to the "complated" snapshot directory.
-    SnapshotDataManifest dataManifest = dataManifestBuilder.build();
-    writeDataManifest(dataManifest);
-    this.regionManifests = dataManifest.getRegionManifestsList();
-
-    // Remove the region manifests. Everything is now in the data-manifest.
-    // The delete operation is "relaxed", unless we get an exception we keep going.
-    // The extra files in the snapshot directory will not give any problem,
-    // since they have the same content as the data manifest, and even by re-reading
-    // them we will get the same information.
-    if (v1Regions != null && v1Regions.size() > 0) {
-      for (SnapshotRegionManifest regionManifest: v1Regions) {
-        SnapshotManifestV1.deleteRegionManifest(fs, workingDir, regionManifest);
-      }
-    }
-    if (v2Regions != null && v2Regions.size() > 0) {
-      for (SnapshotRegionManifest regionManifest: v2Regions) {
-        SnapshotManifestV2.deleteRegionManifest(fs, workingDir, regionManifest);
-      }
-    }
-  }
-
-  /*
-   * Write the SnapshotDataManifest file
-   */
-  private void writeDataManifest(final SnapshotDataManifest manifest)
-      throws IOException {
-    FSDataOutputStream stream = fs.create(new Path(workingDir, DATA_MANIFEST_NAME));
-    try {
-      manifest.writeTo(stream);
-    } finally {
-      stream.close();
-    }
-  }
-
-  /*
-   * Read the SnapshotDataManifest file
-   */
-  private SnapshotDataManifest readDataManifest() throws IOException {
-    FSDataInputStream in = null;
-    try {
-      in = fs.open(new Path(workingDir, DATA_MANIFEST_NAME));
-      CodedInputStream cin = CodedInputStream.newInstance(in);
-      cin.setSizeLimit(manifestSizeLimit);
-      return SnapshotDataManifest.parseFrom(cin);
-    } catch (FileNotFoundException e) {
-      return null;
-    } catch (InvalidProtocolBufferException e) {
-      throw new CorruptedSnapshotException("unable to parse data manifest " + e.getMessage(), e);
-    } finally {
-      if (in != null) in.close();
-    }
-  }
-
-  private ThreadPoolExecutor createExecutor(final String name) {
-    return createExecutor(conf, name);
-  }
-
-  public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
-    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
-    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
-              Threads.getNamedThreadFactory(name));
-  }
-
-  /**
-   * Extract the region encoded name from the region manifest
-   */
-  static String getRegionNameFromManifest(final SnapshotRegionManifest manifest) {
-    byte[] regionName = HRegionInfo.createRegionName(
-            ProtobufUtil.toTableName(manifest.getRegionInfo().getTableName()),
-            manifest.getRegionInfo().getStartKey().toByteArray(),
-            manifest.getRegionInfo().getRegionId(), true);
-    return HRegionInfo.encodeRegionName(regionName);
-  }
-
-  /*
-   * Return the snapshot format
-   */
-  private static int getSnapshotFormat(final SnapshotDescription desc) {
-    return desc.hasVersion() ? desc.getVersion() : SnapshotManifestV1.DESCRIPTOR_VERSION;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java
deleted file mode 100644
index 3ca48fe..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.snapshot;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.fs.RegionStorage;
-import org.apache.hadoop.hbase.fs.StorageIdentifier;
-import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * DO NOT USE DIRECTLY. USE {@link SnapshotManifest}.
- *
- * Snapshot v1 layout format
- *  - Each region in the table is represented by a directory with the .hregioninfo file
- *      /snapshotName/regionName/.hregioninfo
- *  - Each file present in the table is represented by an empty file
- *      /snapshotName/regionName/familyName/fileName
- */
-@InterfaceAudience.Private
-public final class SnapshotManifestV1 {
-  private static final Log LOG = LogFactory.getLog(SnapshotManifestV1.class);
-
-  public static final int DESCRIPTOR_VERSION = 0;
-
-  private SnapshotManifestV1() {
-  }
-
-  // TODO update for RegionStorage
-  static class ManifestBuilder implements SnapshotManifest.RegionVisitor<RegionStorage, Path> {
-    private final Configuration conf;
-    private final StorageIdentifier snapshotDir;
-    private final FileSystem fs;
-
-    public ManifestBuilder(final Configuration conf, final FileSystem fs, final Path snapshotDir) {
-      this.snapshotDir = new LegacyPathIdentifier(snapshotDir);
-      this.conf = conf;
-      this.fs = fs;
-    }
-
-    public RegionStorage regionOpen(final HRegionInfo regionInfo) throws IOException {
-      RegionStorage snapshotRegionFs = RegionStorage.open(conf, fs,
-          snapshotDir, regionInfo, true);
-      return snapshotRegionFs;
-    }
-
-    public void regionClose(final RegionStorage region) {
-    }
-
-    public Path familyOpen(final RegionStorage snapshotRegionFs, final byte[] familyName) {
-      Path familyDir = ((LegacyPathIdentifier)snapshotRegionFs.getStoreContainer(Bytes.toString(familyName))).path;
-      return familyDir;
-    }
-
-    public void familyClose(final RegionStorage region, final Path family) {
-    }
-
-    public void storeFile(final RegionStorage region, final Path familyDir,
-        final StoreFileInfo storeFile) throws IOException {
-      Path referenceFile = new Path(familyDir, storeFile.getPath().getName());
-      boolean success = true;
-      if (storeFile.isReference()) {
-        // write the Reference object to the snapshot
-        storeFile.getReference().write(fs, referenceFile);
-      } else {
-        // create "reference" to this store file.  It is intentionally an empty file -- all
-        // necessary information is captured by its fs location and filename.  This allows us to
-        // only figure out what needs to be done via a single nn operation (instead of having to
-        // open and read the files as well).
-        success = fs.createNewFile(referenceFile);
-      }
-      if (!success) {
-        throw new IOException("Failed to create reference file:" + referenceFile);
-      }
-    }
-  }
-
-  static List<SnapshotRegionManifest> loadRegionManifests(final Configuration conf,
-      final Executor executor,final FileSystem fs, final Path snapshotDir,
-      final SnapshotDescription desc) throws IOException {
-    FileStatus[] regions = FSUtils.listStatus(fs, snapshotDir, new FSUtils.RegionDirFilter(fs));
-    if (regions == null) {
-      LOG.debug("No regions under directory:" + snapshotDir);
-      return null;
-    }
-
-    final ExecutorCompletionService<SnapshotRegionManifest> completionService =
-      new ExecutorCompletionService<SnapshotRegionManifest>(executor);
-    for (final FileStatus region: regions) {
-      completionService.submit(new Callable<SnapshotRegionManifest>() {
-        @Override
-        public SnapshotRegionManifest call() throws IOException {
-          final RegionStorage rs = RegionStorage.open(conf, new LegacyPathIdentifier(region.getPath()), true);
-          return buildManifestFromDisk(conf, fs, snapshotDir, rs);
-        }
-      });
-    }
-
-    ArrayList<SnapshotRegionManifest> regionsManifest =
-        new ArrayList<SnapshotRegionManifest>(regions.length);
-    try {
-      for (int i = 0; i < regions.length; ++i) {
-        regionsManifest.add(completionService.take().get());
-      }
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException(e.getMessage());
-    } catch (ExecutionException e) {
-      IOException ex = new IOException();
-      ex.initCause(e.getCause());
-      throw ex;
-    }
-    return regionsManifest;
-  }
-
-  static void deleteRegionManifest(final FileSystem fs, final Path snapshotDir,
-      final SnapshotRegionManifest manifest) throws IOException {
-    String regionName = SnapshotManifest.getRegionNameFromManifest(manifest);
-    fs.delete(new Path(snapshotDir, regionName), true);
-  }
-
-  static SnapshotRegionManifest buildManifestFromDisk(final Configuration conf,
-      final FileSystem fs, final Path tableDir, final RegionStorage regionFs) throws IOException {
-    SnapshotRegionManifest.Builder manifest = SnapshotRegionManifest.newBuilder();
-
-    // 1. dump region meta info into the snapshot directory
-    LOG.debug("Storing region-info for snapshot.");
-    manifest.setRegionInfo(HRegionInfo.convert(regionFs.getRegionInfo()));
-
-    // 2. iterate through all the stores in the region
-    LOG.debug("Creating references for hfiles");
-
-    // This ensures that we have an atomic view of the directory as long as we have < ls limit
-    // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
-    // batches and may miss files being added/deleted. This could be more robust (iteratively
-    // checking to see if we have all the files until we are sure), but the limit is currently 1000
-    // files/batch, far more than the number of store files under a single column family.
-    Collection<String> familyNames = regionFs.getFamilies();
-    if (familyNames != null) {
-      for (String familyName: familyNames) {
-        Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName, false);
-        if (storeFiles == null) {
-          LOG.debug("No files under family: " + familyName);
-          continue;
-        }
-
-        // 2.1. build the snapshot reference for the store
-        SnapshotRegionManifest.FamilyFiles.Builder family =
-              SnapshotRegionManifest.FamilyFiles.newBuilder();
-        family.setFamilyName(ByteStringer.wrap(Bytes.toBytes(familyName)));
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
-        }
-
-        // 2.2. iterate through all the store's files and create "references".
-        int i = 0;
-        int sz = storeFiles.size();
-        for (StoreFileInfo storeFile: storeFiles) {
-          // create "reference" to this store file.
-          LOG.debug("Adding reference for file ("+ (++i) +"/" + sz + "): " + storeFile.getPath());
-          SnapshotRegionManifest.StoreFile.Builder sfManifest =
-                SnapshotRegionManifest.StoreFile.newBuilder();
-          sfManifest.setName(storeFile.getPath().getName());
-          family.addStoreFiles(sfManifest.build());
-        }
-        manifest.addFamilyFiles(family.build());
-      }
-    }
-    return manifest.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
deleted file mode 100644
index df5dcd3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.snapshot;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * DO NOT USE DIRECTLY. USE {@link SnapshotManifest}.
- *
- * Snapshot v2 layout format
- *  - Single Manifest file containing all the information of regions
- *  - In the online-snapshot case each region will write a "region manifest"
- *      /snapshotName/manifest.regionName
- */
-@InterfaceAudience.Private
-public final class SnapshotManifestV2 {
-  private static final Log LOG = LogFactory.getLog(SnapshotManifestV2.class);
-
-  public static final int DESCRIPTOR_VERSION = 2;
-
-  public static final String SNAPSHOT_MANIFEST_PREFIX = "region-manifest.";
-
-  private SnapshotManifestV2() {}
-
-  static class ManifestBuilder implements SnapshotManifest.RegionVisitor<
-                    SnapshotRegionManifest.Builder, SnapshotRegionManifest.FamilyFiles.Builder> {
-    private final Configuration conf;
-    private final Path snapshotDir;
-    private final FileSystem fs;
-
-    public ManifestBuilder(final Configuration conf, final FileSystem fs, final Path snapshotDir) {
-      this.snapshotDir = snapshotDir;
-      this.conf = conf;
-      this.fs = fs;
-    }
-
-    public SnapshotRegionManifest.Builder regionOpen(final HRegionInfo regionInfo) {
-      SnapshotRegionManifest.Builder manifest = SnapshotRegionManifest.newBuilder();
-      manifest.setRegionInfo(HRegionInfo.convert(regionInfo));
-      return manifest;
-    }
-
-    public void regionClose(final SnapshotRegionManifest.Builder region) throws IOException {
-      // we should ensure the snapshot dir exist, maybe it has been deleted by master
-      // see HBASE-16464
-      if (fs.exists(snapshotDir)) {
-        SnapshotRegionManifest manifest = region.build();
-        FSDataOutputStream stream = fs.create(getRegionManifestPath(snapshotDir, manifest));
-        try {
-          manifest.writeTo(stream);
-        } finally {
-          stream.close();
-        }
-      } else {
-        LOG.warn("can't write manifest without parent dir, maybe it has been deleted by master?");
-      }
-    }
-
-    public SnapshotRegionManifest.FamilyFiles.Builder familyOpen(
-        final SnapshotRegionManifest.Builder region, final byte[] familyName) {
-      SnapshotRegionManifest.FamilyFiles.Builder family =
-          SnapshotRegionManifest.FamilyFiles.newBuilder();
-      family.setFamilyName(ByteStringer.wrap(familyName));
-      return family;
-    }
-
-    public void familyClose(final SnapshotRegionManifest.Builder region,
-        final SnapshotRegionManifest.FamilyFiles.Builder family) {
-      region.addFamilyFiles(family.build());
-    }
-
-    public void storeFile(final SnapshotRegionManifest.Builder region,
-        final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile)
-        throws IOException {
-      SnapshotRegionManifest.StoreFile.Builder sfManifest =
-            SnapshotRegionManifest.StoreFile.newBuilder();
-      sfManifest.setName(storeFile.getPath().getName());
-      if (storeFile.isReference()) {
-        sfManifest.setReference(storeFile.getReference().convert());
-      }
-      sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen());
-      family.addStoreFiles(sfManifest.build());
-    }
-  }
-
-  static List<SnapshotRegionManifest> loadRegionManifests(final Configuration conf,
-      final Executor executor,final FileSystem fs, final Path snapshotDir,
-      final SnapshotDescription desc) throws IOException {
-    FileStatus[] manifestFiles = FSUtils.listStatus(fs, snapshotDir, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().startsWith(SNAPSHOT_MANIFEST_PREFIX);
-      }
-    });
-
-    if (manifestFiles == null || manifestFiles.length == 0) return null;
-
-    final ExecutorCompletionService<SnapshotRegionManifest> completionService =
-      new ExecutorCompletionService<SnapshotRegionManifest>(executor);
-    for (final FileStatus st: manifestFiles) {
-      completionService.submit(new Callable<SnapshotRegionManifest>() {
-        @Override
-        public SnapshotRegionManifest call() throws IOException {
-          FSDataInputStream stream = fs.open(st.getPath());
-          try {
-            return SnapshotRegionManifest.parseFrom(stream);
-          } finally {
-            stream.close();
-          }
-        }
-      });
-    }
-
-    ArrayList<SnapshotRegionManifest> regionsManifest =
-        new ArrayList<SnapshotRegionManifest>(manifestFiles.length);
-    try {
-      for (int i = 0; i < manifestFiles.length; ++i) {
-        regionsManifest.add(completionService.take().get());
-      }
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException(e.getMessage());
-    } catch (ExecutionException e) {
-      Throwable t = e.getCause();
-
-      if(t instanceof InvalidProtocolBufferException) {
-        throw (InvalidProtocolBufferException)t;
-      } else {
-        IOException ex = new IOException("ExecutionException");
-        ex.initCause(e.getCause());
-        throw ex;
-      }
-    }
-    return regionsManifest;
-  }
-
-  static void deleteRegionManifest(final FileSystem fs, final Path snapshotDir,
-      final SnapshotRegionManifest manifest) throws IOException {
-    fs.delete(getRegionManifestPath(snapshotDir, manifest), true);
-  }
-
-  private static Path getRegionManifestPath(final Path snapshotDir,
-      final SnapshotRegionManifest manifest) {
-    String regionName = SnapshotManifest.getRegionNameFromManifest(manifest);
-    return new Path(snapshotDir, SNAPSHOT_MANIFEST_PREFIX + regionName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
index e97a60b..9800190 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
@@ -38,10 +38,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageContext;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.fs.legacy.io.HFileLink;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos;
 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
@@ -53,207 +59,110 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 public final class SnapshotReferenceUtil {
   private static final Log LOG = LogFactory.getLog(SnapshotReferenceUtil.class);
 
-  public interface StoreFileVisitor {
-    void storeFile(final HRegionInfo regionInfo, final String familyName,
-       final SnapshotRegionManifest.StoreFile storeFile) throws IOException;
-  }
-
-  public interface SnapshotVisitor extends StoreFileVisitor {
-  }
-
   private SnapshotReferenceUtil() {
     // private constructor for utility class
   }
 
   /**
-   * Iterate over the snapshot store files
-   *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
-   * @param visitor callback object to get the referenced files
-   * @throws IOException if an error occurred while scanning the directory
-   */
-  public static void visitReferencedFiles(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir, final SnapshotVisitor visitor)
-      throws IOException {
-    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    visitReferencedFiles(conf, fs, snapshotDir, desc, visitor);
-  }
-
-  /**
-   * Iterate over the snapshot store files, restored.edits and logs
-   *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
-   * @param desc the {@link SnapshotDescription} of the snapshot to verify
-   * @param visitor callback object to get the referenced files
-   * @throws IOException if an error occurred while scanning the directory
-   */
-  public static void visitReferencedFiles(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir, final SnapshotDescription desc, final SnapshotVisitor visitor)
-      throws IOException {
-    visitTableStoreFiles(conf, fs, snapshotDir, desc, visitor);
-  }
-
-  /**�
-   * Iterate over the snapshot store files
-   *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
-   * @param desc the {@link SnapshotDescription} of the snapshot to verify
-   * @param visitor callback object to get the store files
-   * @throws IOException if an error occurred while scanning the directory
-   */
-  static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir, final SnapshotDescription desc, final StoreFileVisitor visitor)
-      throws IOException {
-    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, desc);
-    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
-    if (regionManifests == null || regionManifests.size() == 0) {
-      LOG.debug("No manifest files present: " + snapshotDir);
-      return;
-    }
-
-    for (SnapshotRegionManifest regionManifest: regionManifests) {
-      visitRegionStoreFiles(regionManifest, visitor);
-    }
-  }
-
-  /**
-   * Iterate over the snapshot store files in the specified region
-   *
-   * @param manifest snapshot manifest to inspect
-   * @param visitor callback object to get the store files
-   * @throws IOException if an error occurred while scanning the directory
-   */
-  static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
-      final StoreFileVisitor visitor) throws IOException {
-    HRegionInfo regionInfo = HRegionInfo.convert(manifest.getRegionInfo());
-    for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
-      String familyName = familyFiles.getFamilyName().toStringUtf8();
-      for (SnapshotRegionManifest.StoreFile storeFile: familyFiles.getStoreFilesList()) {
-        visitor.storeFile(regionInfo, familyName, storeFile);
-      }
-    }
-  }
-
-  /**
-   * Verify the validity of the snapshot
-   *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory of the snapshot to verify
-   * @param snapshotDesc the {@link SnapshotDescription} of the snapshot to verify
-   * @throws CorruptedSnapshotException if the snapshot is corrupted
-   * @throws IOException if an error occurred while scanning the directory
-   */
-  public static void verifySnapshot(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir, final SnapshotDescription snapshotDesc) throws IOException {
-    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
-    verifySnapshot(conf, fs, manifest);
-  }
-
-  /**
    * Verify the validity of the snapshot
    *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param manifest snapshot manifest to inspect
+   * @param masterStorage {@link MasterStorage} for a snapshot
+   * @param snapshot the {@link SnapshotDescription} of the snapshot to verify
+   * @param ctx {@link StorageContext} of a snapshot
    * @throws CorruptedSnapshotException if the snapshot is corrupted
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void verifySnapshot(final Configuration conf, final FileSystem fs,
-      final SnapshotManifest manifest) throws IOException {
-    final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
-    final Path snapshotDir = manifest.getSnapshotDir();
-    concurrentVisitReferencedFiles(conf, fs, manifest, "VerifySnapshot", new StoreFileVisitor() {
-      @Override
-      public void storeFile(final HRegionInfo regionInfo, final String family,
-          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
-        verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
-      }
+  public static void verifySnapshot(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      final SnapshotDescription snapshot, StorageContext ctx) throws IOException {
+    masterStorage.visitSnapshotStoreFiles(snapshot, ctx,
+        new MasterStorage.SnapshotStoreFileVisitor() {
+          @Override
+          public void visitSnapshotStoreFile(SnapshotDescription snapshot, StorageContext ctx,
+              HRegionInfo hri, String familyName, SnapshotRegionManifest.StoreFile storeFile)
+              throws IOException {
+            verifyStoreFile(masterStorage, snapshot, ctx, hri, familyName, storeFile);
+          }
     });
   }
 
-  public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
-      final SnapshotManifest manifest, final String desc, final StoreFileVisitor visitor)
-      throws IOException {
-
-    final Path snapshotDir = manifest.getSnapshotDir();
-    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
-    if (regionManifests == null || regionManifests.size() == 0) {
-      LOG.debug("No manifest files present: " + snapshotDir);
-      return;
-    }
+  // TODO modify master storage to use ExecutorService and run concurrent queries to HDFS NN and
+  // remove the commented out code below.
 
-    ExecutorService exec = SnapshotManifest.createExecutor(conf, desc);
-
-    try {
-      concurrentVisitReferencedFiles(conf, fs, manifest, exec, visitor);
-    } finally {
-      exec.shutdown();
-    }
-  }
-
-  public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
-      final SnapshotManifest manifest, final ExecutorService exec, final StoreFileVisitor visitor)
-      throws IOException {
-    final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
-    final Path snapshotDir = manifest.getSnapshotDir();
-
-    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
-    if (regionManifests == null || regionManifests.size() == 0) {
-      LOG.debug("No manifest files present: " + snapshotDir);
-      return;
-    }
-
-    final ExecutorCompletionService<Void> completionService =
-      new ExecutorCompletionService<Void>(exec);
-
-    for (final SnapshotRegionManifest regionManifest : regionManifests) {
-      completionService.submit(new Callable<Void>() {
-        @Override public Void call() throws IOException {
-          visitRegionStoreFiles(regionManifest, visitor);
-          return null;
-        }
-      });
-    }
-    try {
-      for (int i = 0; i < regionManifests.size(); ++i) {
-        completionService.take().get();
-      }
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException(e.getMessage());
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof CorruptedSnapshotException) {
-        throw new CorruptedSnapshotException(e.getCause().getMessage(),
-            ProtobufUtil.createSnapshotDesc(snapshotDesc));
-      } else {
-        IOException ex = new IOException();
-        ex.initCause(e.getCause());
-        throw ex;
-      }
-    }
-  }
+//  public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
+//      final SnapshotManifest manifest, final String desc, final StoreFileVisitor visitor)
+//      throws IOException {
+//
+//    final Path snapshotDir = manifest.getSnapshotDir();
+//    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
+//    if (regionManifests == null || regionManifests.size() == 0) {
+//      LOG.debug("No manifest files present: " + snapshotDir);
+//      return;
+//    }
+//
+//    ExecutorService exec = SnapshotManifest.createExecutor(conf, desc);
+//
+//    try {
+//      concurrentVisitReferencedFiles(conf, fs, manifest, exec, visitor);
+//    } finally {
+//      exec.shutdown();
+//    }
+//  }
+//
+//  public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
+//      final SnapshotManifest manifest, final ExecutorService exec, final StoreFileVisitor visitor)
+//      throws IOException {
+//    final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
+//    final Path snapshotDir = manifest.getSnapshotDir();
+//
+//    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
+//    if (regionManifests == null || regionManifests.size() == 0) {
+//      LOG.debug("No manifest files present: " + snapshotDir);
+//      return;
+//    }
+//
+//    final ExecutorCompletionService<Void> completionService =
+//      new ExecutorCompletionService<Void>(exec);
+//
+//    for (final SnapshotRegionManifest regionManifest : regionManifests) {
+//      completionService.submit(new Callable<Void>() {
+//        @Override public Void call() throws IOException {
+//          visitRegionStoreFiles(regionManifest, visitor);
+//          return null;
+//        }
+//      });
+//    }
+//    try {
+//      for (int i = 0; i < regionManifests.size(); ++i) {
+//        completionService.take().get();
+//      }
+//    } catch (InterruptedException e) {
+//      throw new InterruptedIOException(e.getMessage());
+//    } catch (ExecutionException e) {
+//      if (e.getCause() instanceof CorruptedSnapshotException) {
+//        throw new CorruptedSnapshotException(e.getCause().getMessage(),
+//            ProtobufUtil.createSnapshotDesc(snapshotDesc));
+//      } else {
+//        IOException ex = new IOException();
+//        ex.initCause(e.getCause());
+//        throw ex;
+//      }
+//    }
+//  }
 
   /**
    * Verify the validity of the snapshot store file
    *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory of the snapshot to verify
+   * @param masterStorage (@link MasterStorage} for the snapshot
    * @param snapshot the {@link SnapshotDescription} of the snapshot to verify
+   * @param ctx {@link StorageContext} of a snapshot
    * @param regionInfo {@link HRegionInfo} of the region that contains the store file
    * @param family family that contains the store file
    * @param storeFile the store file to verify
    * @throws CorruptedSnapshotException if the snapshot is corrupted
    * @throws IOException if an error occurred while scanning the directory
    */
-  private static void verifyStoreFile(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir, final SnapshotDescription snapshot, final HRegionInfo regionInfo,
+  private static void verifyStoreFile(final MasterStorage<? extends StorageIdentifier> masterStorage,
+      final SnapshotDescription snapshot, StorageContext ctx, final HRegionInfo regionInfo,
       final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
     TableName table = TableName.valueOf(snapshot.getTable());
     String fileName = storeFile.getName();
@@ -265,7 +174,8 @@ public final class SnapshotReferenceUtil {
       refPath = StoreFileInfo.getReferredToFile(refPath);
       String refRegion = refPath.getParent().getParent().getName();
       refPath = HFileLink.createPath(table, refRegion, family, refPath.getName());
-      if (!HFileLink.buildFromHFileLinkPattern(conf, refPath).exists(fs)) {
+      if (!HFileLink.buildFromHFileLinkPattern(masterStorage.getConfiguration(), refPath)
+          .exists(masterStorage.getFileSystem())) {
         throw new CorruptedSnapshotException(
             "Missing parent hfile for: " + fileName + " path=" + refPath,
             ProtobufUtil.createSnapshotDesc(snapshot));
@@ -292,14 +202,15 @@ public final class SnapshotReferenceUtil {
     HFileLink link = null;
     if (MobUtils.isMobRegionInfo(regionInfo)) {
       // for mob region
-      link = HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
-          HFileArchiveUtil.getArchivePath(conf), linkPath);
+      link = HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(
+          masterStorage.getConfiguration()), HFileArchiveUtil.getArchivePath(
+          masterStorage.getConfiguration()), linkPath);
     } else {
       // not mob region
-      link = HFileLink.buildFromHFileLinkPattern(conf, linkPath);
+      link = HFileLink.buildFromHFileLinkPattern(masterStorage.getConfiguration(), linkPath);
     }
     try {
-      FileStatus fstat = link.getFileStatus(fs);
+      FileStatus fstat = link.getFileStatus(masterStorage.getFileSystem());
       if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {
         String msg = "hfile: " + fileName + " size does not match with the expected one. " +
           " found=" + fstat.getLen() + " expected=" + storeFile.getFileSize();
@@ -320,43 +231,47 @@ public final class SnapshotReferenceUtil {
   /**
    * Returns the store file names in the snapshot.
    *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
+   * @param masterStorage {@link MasterStorage} for a snapshot.
+   * @param snapshotName Name of the snapshot
    * @throws IOException if an error occurred while scanning the directory
    * @return the names of hfiles in the specified snaphot
    */
-  public static Set<String> getHFileNames(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir) throws IOException {
-    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-    return getHFileNames(conf, fs, snapshotDir, desc);
+  public static Set<String> getHFileNames(final MasterStorage<? extends StorageIdentifier>
+      masterStorage, final String snapshotName) throws IOException {
+    return getHFileNames(masterStorage, snapshotName, StorageContext.DATA);
+  }
+
+  public static Set<String> getHFileNames(final MasterStorage<? extends StorageIdentifier>
+      masterStorage, final String snapshotName, StorageContext ctx) throws IOException {
+    SnapshotDescription desc = masterStorage.getSnapshot(snapshotName, ctx);
+    return getHFileNames(masterStorage, desc, ctx);
   }
 
   /**
    * Returns the store file names in the snapshot.
    *
-   * @param conf The current {@link Configuration} instance.
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
-   * @param snapshotDesc the {@link SnapshotDescription} of the snapshot to inspect
+   * @param masterStorage {@link MasterStorage} for a snapshot
+   * @param snapshot the {@link SnapshotDescription} of the snapshot to inspect
+   * @param ctx {@link StorageContext} for a snapshot
    * @throws IOException if an error occurred while scanning the directory
    * @return the names of hfiles in the specified snaphot
    */
-  private static Set<String> getHFileNames(final Configuration conf, final FileSystem fs,
-      final Path snapshotDir, final SnapshotDescription snapshotDesc)
-      throws IOException {
+  private static Set<String> getHFileNames(final MasterStorage<? extends StorageIdentifier>
+      masterStorage,final SnapshotDescription snapshot, StorageContext ctx) throws IOException {
     final Set<String> names = new HashSet<String>();
-    visitTableStoreFiles(conf, fs, snapshotDir, snapshotDesc, new StoreFileVisitor() {
-      @Override
-      public void storeFile(final HRegionInfo regionInfo, final String family,
-            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
-        String hfile = storeFile.getName();
-        if (HFileLink.isHFileLink(hfile)) {
-          names.add(HFileLink.getReferencedHFileName(hfile));
-        } else {
-          names.add(hfile);
-        }
-      }
+    masterStorage.visitSnapshotStoreFiles(snapshot, ctx,
+        new MasterStorage.SnapshotStoreFileVisitor() {
+          @Override
+          public void visitSnapshotStoreFile(SnapshotDescription snapshot, StorageContext ctx,
+              HRegionInfo hri, String familyName, SnapshotRegionManifest.StoreFile storeFile)
+              throws IOException {
+            String hfile = storeFile.getName();
+            if (HFileLink.isHFileLink(hfile)) {
+              names.add(HFileLink.getReferencedHFileName(hfile));
+            } else {
+              names.add(hfile);
+            }
+          }
     });
     return names;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRestoreMetaChanges.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRestoreMetaChanges.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRestoreMetaChanges.java
new file mode 100644
index 0000000..22899d5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRestoreMetaChanges.java
@@ -0,0 +1,157 @@
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Describe the set of operations needed to update hbase:meta after restore.
+ */
+public class SnapshotRestoreMetaChanges {
+  private static final Log LOG = LogFactory.getLog(SnapshotRestoreMetaChanges.class);
+
+  private final Map<String, Pair<String, String> > parentsMap;
+  private final HTableDescriptor htd;
+
+  private List<HRegionInfo> regionsToRestore = null;
+  private List<HRegionInfo> regionsToRemove = null;
+  private List<HRegionInfo> regionsToAdd = null;
+
+  public SnapshotRestoreMetaChanges(HTableDescriptor htd, Map<String,
+      Pair<String, String> > parentsMap) {
+    this.parentsMap = parentsMap;
+    this.htd = htd;
+  }
+
+  public HTableDescriptor getTableDescriptor() {
+    return htd;
+  }
+
+  /**
+   * Returns the map of parent-children_pair.
+   * @return the map
+   */
+  public Map<String, Pair<String, String>> getParentToChildrenPairMap() {
+    return this.parentsMap;
+  }
+
+  /**
+   * @return true if there're new regions
+   */
+  public boolean hasRegionsToAdd() {
+    return this.regionsToAdd != null && this.regionsToAdd.size() > 0;
+  }
+
+  /**
+   * Returns the list of new regions added during the on-disk restore.
+   * The caller is responsible to add the regions to META.
+   * e.g MetaTableAccessor.addRegionsToMeta(...)
+   * @return the list of regions to add to META
+   */
+  public List<HRegionInfo> getRegionsToAdd() {
+    return this.regionsToAdd;
+  }
+
+  /**
+   * @return true if there're regions to restore
+   */
+  public boolean hasRegionsToRestore() {
+    return this.regionsToRestore != null && this.regionsToRestore.size() > 0;
+  }
+
+  /**
+   * Returns the list of 'restored regions' during the on-disk restore.
+   * The caller is responsible to add the regions to hbase:meta if not present.
+   * @return the list of regions restored
+   */
+  public List<HRegionInfo> getRegionsToRestore() {
+    return this.regionsToRestore;
+  }
+
+  /**
+   * @return true if there're regions to remove
+   */
+  public boolean hasRegionsToRemove() {
+    return this.regionsToRemove != null && this.regionsToRemove.size() > 0;
+  }
+
+  /**
+   * Returns the list of regions removed during the on-disk restore.
+   * The caller is responsible to remove the regions from META.
+   * e.g. MetaTableAccessor.deleteRegions(...)
+   * @return the list of regions to remove from META
+   */
+  public List<HRegionInfo> getRegionsToRemove() {
+    return this.regionsToRemove;
+  }
+
+  public void setNewRegions(final HRegionInfo[] hris) {
+    if (hris != null) {
+      regionsToAdd = Arrays.asList(hris);
+    } else {
+      regionsToAdd = null;
+    }
+  }
+
+  public void addRegionToRemove(final HRegionInfo hri) {
+    if (regionsToRemove == null) {
+      regionsToRemove = new LinkedList<HRegionInfo>();
+    }
+    regionsToRemove.add(hri);
+  }
+
+  public void addRegionToRestore(final HRegionInfo hri) {
+    if (regionsToRestore == null) {
+      regionsToRestore = new LinkedList<HRegionInfo>();
+    }
+    regionsToRestore.add(hri);
+  }
+
+  public void updateMetaParentRegions(Connection connection,
+      final List<HRegionInfo> regionInfos) throws IOException {
+    if (regionInfos == null || parentsMap.isEmpty()) return;
+
+    // Extract region names and offlined regions
+    Map<String, HRegionInfo> regionsByName = new HashMap<String, HRegionInfo>(regionInfos.size());
+    List<HRegionInfo> parentRegions = new LinkedList<>();
+    for (HRegionInfo regionInfo: regionInfos) {
+      if (regionInfo.isSplitParent()) {
+        parentRegions.add(regionInfo);
+      } else {
+        regionsByName.put(regionInfo.getEncodedName(), regionInfo);
+      }
+    }
+
+    // Update Offline parents
+    for (HRegionInfo regionInfo: parentRegions) {
+      Pair<String, String> daughters = parentsMap.get(regionInfo.getEncodedName());
+      if (daughters == null) {
+        // The snapshot contains an unreferenced region.
+        // It will be removed by the CatalogJanitor.
+        LOG.warn("Skip update of unreferenced offline parent: " + regionInfo);
+        continue;
+      }
+
+      // One side of the split is already compacted
+      if (daughters.getSecond() == null) {
+        daughters.setSecond(daughters.getFirst());
+      }
+
+      LOG.debug("Update splits parent " + regionInfo.getEncodedName() + " -> " + daughters);
+      MetaTableAccessor.addRegionToMeta(connection, regionInfo,
+        regionsByName.get(daughters.getFirst()),
+        regionsByName.get(daughters.getSecond()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
index fa61be0..219e4af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
@@ -28,24 +28,19 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifestV1;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -53,8 +48,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Lists;
-
 /**
  * Test create/using/deleting snapshots from the client
  * <p>

http://git-wip-us.apache.org/repos/asf/hbase/blob/159a67c6/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/legacy/snapshot/TestExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/legacy/snapshot/TestExportSnapshot.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/legacy/snapshot/TestExportSnapshot.java
new file mode 100644
index 0000000..ef3d13c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/legacy/snapshot/TestExportSnapshot.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy.snapshot;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.fs.MasterStorage;
+import org.apache.hadoop.hbase.fs.StorageContext;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestExportSnapshot {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  private static final Log LOG = LogFactory.getLog(TestExportSnapshot.class);
+
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  protected final static byte[] FAMILY = Bytes.toBytes("cf");
+
+  protected TableName tableName;
+  private byte[] emptySnapshotName;
+  private byte[] snapshotName;
+  private int tableNumFiles;
+  private Admin admin;
+
+  public static void setUpBaseConf(Configuration conf) {
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+    conf.setInt("hbase.regionserver.msginterval", 100);
+    conf.setInt("hbase.client.pause", 250);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+    conf.setInt("mapreduce.map.maxattempts", 10);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Create a table and take a snapshot of the table used by the export test.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.admin = TEST_UTIL.getHBaseAdmin();
+
+    long tid = System.currentTimeMillis();
+    tableName = TableName.valueOf("testtb-" + tid);
+    snapshotName = Bytes.toBytes("snaptb0-" + tid);
+    emptySnapshotName = Bytes.toBytes("emptySnaptb0-" + tid);
+
+    // create Table
+    createTable();
+
+    // Take an empty snapshot
+    admin.snapshot(emptySnapshotName, tableName);
+
+    // Add some rows
+    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 50, FAMILY);
+    tableNumFiles = admin.getTableRegions(tableName).size();
+
+    // take a snapshot
+    admin.snapshot(snapshotName, tableName);
+  }
+
+  protected void createTable() throws Exception {
+    SnapshotTestingUtils.createPreSplitTable(TEST_UTIL, tableName, 2, FAMILY);
+  }
+
+  protected interface RegionPredicate {
+    boolean evaluate(final HRegionInfo regionInfo);
+  }
+
+  protected RegionPredicate getBypassRegionPredicate() {
+    return null;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.deleteTable(tableName);
+    SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
+    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+  }
+
+  /**
+   * Verify if exported snapshot and copied files matches the original one.
+   */
+  @Test
+  public void testExportFileSystemState() throws Exception {
+    testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles);
+  }
+
+  @Test
+  public void testExportFileSystemStateWithSkipTmp() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(ExportSnapshot.CONF_SKIP_TMP, true);
+    try {
+      testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles);
+    } finally {
+      TEST_UTIL.getConfiguration().setBoolean(ExportSnapshot.CONF_SKIP_TMP, false);
+    }
+  }
+
+  @Test
+  public void testEmptyExportFileSystemState() throws Exception {
+    testExportFileSystemState(tableName, emptySnapshotName, emptySnapshotName, 0);
+  }
+
+  @Test
+  public void testConsecutiveExports() throws Exception {
+    Path copyDir = getLocalDestinationDir();
+    testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles, copyDir, false);
+    testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles, copyDir, true);
+    removeExportDir(copyDir);
+  }
+
+  @Test
+  public void testExportWithTargetName() throws Exception {
+    final byte[] targetName = Bytes.toBytes("testExportWithTargetName");
+    testExportFileSystemState(tableName, snapshotName, targetName, tableNumFiles);
+  }
+
+  private void testExportFileSystemState(final TableName tableName, final byte[] snapshotName,
+      final byte[] targetName, int filesExpected) throws Exception {
+    testExportFileSystemState(tableName, snapshotName, targetName,
+      filesExpected, getHdfsDestinationDir(), false);
+  }
+
+  protected void testExportFileSystemState(final TableName tableName,
+      final byte[] snapshotName, final byte[] targetName, int filesExpected,
+      Path copyDir, boolean overwrite) throws Exception {
+    testExportFileSystemState(TEST_UTIL.getConfiguration(), tableName, snapshotName, targetName,
+      filesExpected, TEST_UTIL.getDefaultRootDirPath(), copyDir,
+      overwrite, getBypassRegionPredicate());
+  }
+
+  /**
+   * Test ExportSnapshot
+   */
+  protected static void testExportFileSystemState(final Configuration conf, final TableName tableName,
+      final byte[] snapshotName, final byte[] targetName, final int filesExpected,
+      final Path sourceDir, Path copyDir, final boolean overwrite,
+      final RegionPredicate bypassregionPredicate) throws Exception {
+    URI hdfsUri = FileSystem.get(conf).getUri();
+    FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+    copyDir = copyDir.makeQualified(fs);
+
+    List<String> opts = new ArrayList<String>();
+    opts.add("-snapshot");
+    opts.add(Bytes.toString(snapshotName));
+    opts.add("-copy-to");
+    opts.add(copyDir.toString());
+    if (targetName != snapshotName) {
+      opts.add("-target");
+      opts.add(Bytes.toString(targetName));
+    }
+    if (overwrite) opts.add("-overwrite");
+
+    // Export Snapshot
+    int res = ExportSnapshot.innerMain(conf, opts.toArray(new String[opts.size()]));
+    assertEquals(0, res);
+
+    // Verify File-System state
+    FileStatus[] rootFiles = fs.listStatus(copyDir);
+    assertEquals(filesExpected > 0 ? 2 : 1, rootFiles.length);
+    for (FileStatus fileStatus: rootFiles) {
+      String name = fileStatus.getPath().getName();
+      assertTrue(fileStatus.isDirectory());
+      assertTrue(name.equals(HConstants.SNAPSHOT_DIR_NAME) ||
+                 name.equals(HConstants.HFILE_ARCHIVE_DIRECTORY));
+    }
+
+    // compare the snapshot metadata and verify the hfiles
+    final FileSystem hdfs = FileSystem.get(hdfsUri, conf);
+    final Path snapshotDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(snapshotName));
+    final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(targetName));
+    verifySnapshotDir(hdfs, new Path(sourceDir, snapshotDir),
+        fs, new Path(copyDir, targetDir));
+    Set<String> snapshotFiles = verifySnapshot(conf, fs, copyDir, tableName,
+      Bytes.toString(targetName), bypassregionPredicate);
+    assertEquals(filesExpected, snapshotFiles.size());
+  }
+
+  /**
+   * Check that ExportSnapshot will return a failure if something fails.
+   */
+  @Test
+  public void testExportFailure() throws Exception {
+    assertEquals(1, runExportAndInjectFailures(snapshotName, false));
+  }
+
+  /**
+   * Check that ExportSnapshot will succede if something fails but the retry succede.
+   */
+  @Test
+  public void testExportRetry() throws Exception {
+    assertEquals(0, runExportAndInjectFailures(snapshotName, true));
+  }
+
+  /*
+   * Execute the ExportSnapshot job injecting failures
+   */
+  private int runExportAndInjectFailures(final byte[] snapshotName, boolean retry)
+      throws Exception {
+    Path copyDir = getLocalDestinationDir();
+    URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri();
+    FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+    copyDir = copyDir.makeQualified(fs);
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(ExportSnapshot.CONF_TEST_FAILURE, true);
+    conf.setBoolean(ExportSnapshot.CONF_TEST_RETRY, retry);
+    if (!retry) {
+      conf.setInt("mapreduce.map.maxattempts", 3);
+    }
+    // Export Snapshot
+//    Path sourceDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterStorage().getRootDir();
+    Path sourceDir = null;
+    int res = ExportSnapshot.innerMain(conf, new String[] {
+      "-snapshot", Bytes.toString(snapshotName),
+      "-copy-from", sourceDir.toString(),
+      "-copy-to", copyDir.toString()
+    });
+    return res;
+  }
+
+  /*
+   * verify if the snapshot folder on file-system 1 match the one on file-system 2
+   */
+  protected static void verifySnapshotDir(final FileSystem fs1, final Path root1,
+      final FileSystem fs2, final Path root2) throws IOException {
+    assertEquals(listFiles(fs1, root1, root1), listFiles(fs2, root2, root2));
+  }
+
+  protected Set<String> verifySnapshot(final FileSystem fs, final Path rootDir,
+      final TableName tableName, final String snapshotName) throws IOException {
+    return verifySnapshot(TEST_UTIL.getConfiguration(), fs, rootDir, tableName,
+      snapshotName, getBypassRegionPredicate());
+  }
+
+  /*
+   * Verify if the files exists
+   */
+  protected static Set<String> verifySnapshot(final Configuration conf, final FileSystem fs,
+      final Path rootDir, final TableName tableName, final String snapshotName,
+      final RegionPredicate bypassregionPredicate) throws IOException {
+    final Path exportedSnapshot = new Path(rootDir,
+      new Path(HConstants.SNAPSHOT_DIR_NAME, snapshotName));
+    final Set<String> snapshotFiles = new HashSet<String>();
+    final Path exportedArchive = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY);
+    MasterStorage<? extends StorageIdentifier> masterStorage = MasterStorage.open(conf, false);
+    SnapshotDescription desc = masterStorage.getSnapshot(snapshotName);
+    masterStorage.visitSnapshotStoreFiles(desc, StorageContext.DATA,
+        new MasterStorage.SnapshotStoreFileVisitor() {
+          @Override
+          public void visitSnapshotStoreFile(SnapshotDescription snapshot, StorageContext ctx,
+              HRegionInfo hri, String familyName, SnapshotRegionManifest.StoreFile storeFile)
+              throws IOException {
+            if (bypassregionPredicate != null && bypassregionPredicate.evaluate(hri))
+              return;
+
+            String hfile = storeFile.getName();
+            snapshotFiles.add(hfile);
+            if (storeFile.hasReference()) {
+              // Nothing to do here, we have already the reference embedded
+            } else {
+              verifyNonEmptyFile(new Path(exportedArchive,
+                  new Path(FSUtils.getTableDir(new Path("./"), tableName),
+                      new Path(hri.getEncodedName(), new Path(familyName, hfile)))));
+            }
+          }
+
+          private void verifyNonEmptyFile(final Path path) throws IOException {
+            assertTrue(path + " should exists", fs.exists(path));
+            assertTrue(path + " should not be empty", fs.getFileStatus(path).getLen() > 0);
+          }
+        });
+
+    // Verify Snapshot description
+    assertTrue(desc.getName().equals(snapshotName));
+    assertTrue(desc.getTable().equals(tableName.getNameAsString()));
+    return snapshotFiles;
+  }
+
+  private static Set<String> listFiles(final FileSystem fs, final Path root, final Path dir)
+      throws IOException {
+    Set<String> files = new HashSet<String>();
+    int rootPrefix = root.makeQualified(fs).toString().length();
+    FileStatus[] list = FSUtils.listStatus(fs, dir);
+    if (list != null) {
+      for (FileStatus fstat: list) {
+        LOG.debug(fstat.getPath());
+        if (fstat.isDirectory()) {
+          files.addAll(listFiles(fs, root, fstat.getPath()));
+        } else {
+          files.add(fstat.getPath().makeQualified(fs).toString().substring(rootPrefix));
+        }
+      }
+    }
+    return files;
+  }
+
+  private Path getHdfsDestinationDir() {
+//    Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterStorage().getRootDir();
+    Path rootDir = null;
+    Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis());
+    LOG.info("HDFS export destination path: " + path);
+    return path;
+  }
+
+  private Path getLocalDestinationDir() {
+    Path path = TEST_UTIL.getDataTestDir("local-export-" + System.currentTimeMillis());
+    LOG.info("Local export destination path: " + path);
+    return path;
+  }
+
+  private static void removeExportDir(final Path path) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
+    fs.delete(path, true);
+  }
+}