You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/08/20 07:44:17 UTC

[3/7] HBASE-11742 (Backport HBASE-7987 and HBASE-11185 to 0.98) (Esteban Gutierrez and Matteo Bertozzi)

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
new file mode 100644
index 0000000..47c6ebf
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Utility class to help read/write the Snapshot Manifest.
+ *
+ * The snapshot format is transparent for the users of this class,
+ * once the snapshot is written, it will never be modified.
+ * On open() the snapshot will be loaded to the current in-memory format.
+ */
+@InterfaceAudience.Private
+public class SnapshotManifest {
+  private static final Log LOG = LogFactory.getLog(SnapshotManifest.class);
+
+  private static final String DATA_MANIFEST_NAME = "data.manifest";
+
+  private List<SnapshotRegionManifest> regionManifests;
+  private SnapshotDescription desc;
+  private HTableDescriptor htd;
+
+  private final ForeignExceptionSnare monitor;
+  private final Configuration conf;
+  private final Path workingDir;
+  private final FileSystem fs;
+
+  private SnapshotManifest(final Configuration conf, final FileSystem fs,
+      final Path workingDir, final SnapshotDescription desc,
+      final ForeignExceptionSnare monitor) {
+    this.monitor = monitor;
+    this.desc = desc;
+    this.workingDir = workingDir;
+    this.conf = conf;
+    this.fs = fs;
+  }
+
+  /**
+   * Return a SnapshotManifest instance, used for writing a snapshot.
+   *
+   * There are two usage pattern:
+   *  - The Master will create a manifest, add the descriptor, offline regions
+   *    and consolidate the snapshot by writing all the pending stuff on-disk.
+   *      manifest = SnapshotManifest.create(...)
+   *      manifest.addRegion(tableDir, hri)
+   *      manifest.consolidate()
+   *  - The RegionServer will create a single region manifest
+   *      manifest = SnapshotManifest.create(...)
+   *      manifest.addRegion(region)
+   */
+  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
+      final Path workingDir, final SnapshotDescription desc,
+      final ForeignExceptionSnare monitor) {
+    return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
+  }
+
+  /**
+   * Return a SnapshotManifest instance with the information already loaded in-memory.
+   *    SnapshotManifest manifest = SnapshotManifest.open(...)
+   *    HTableDescriptor htd = manifest.getTableDescriptor()
+   *    for (SnapshotRegionManifest regionManifest: manifest.getRegionManifests())
+   *      hri = regionManifest.getRegionInfo()
+   *      for (regionManifest.getFamilyFiles())
+   *        ...
+   */
+  public static SnapshotManifest open(final Configuration conf, final FileSystem fs,
+      final Path workingDir, final SnapshotDescription desc) throws IOException {
+    SnapshotManifest manifest = new SnapshotManifest(conf, fs, workingDir, desc, null);
+    manifest.load();
+    return manifest;
+  }
+
+
+  /**
+   * Add the table descriptor to the snapshot manifest
+   */
+  public void addTableDescriptor(final HTableDescriptor htd) throws IOException {
+    this.htd = htd;
+  }
+
+  interface RegionVisitor<TRegion, TFamily> {
+    TRegion regionOpen(final HRegionInfo regionInfo) throws IOException;
+    void regionClose(final TRegion region) throws IOException;
+
+    TFamily familyOpen(final TRegion region, final byte[] familyName) throws IOException;
+    void familyClose(final TRegion region, final TFamily family) throws IOException;
+
+    void storeFile(final TRegion region, final TFamily family, final StoreFileInfo storeFile)
+      throws IOException;
+  }
+
+  private RegionVisitor createRegionVisitor(final SnapshotDescription desc) throws IOException {
+    switch (getSnapshotFormat(desc)) {
+      case SnapshotManifestV1.DESCRIPTOR_VERSION:
+        return new SnapshotManifestV1.ManifestBuilder(conf, fs, workingDir);
+      case SnapshotManifestV2.DESCRIPTOR_VERSION:
+        return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir);
+      default:
+        throw new CorruptedSnapshotException("Invalid Snapshot version: "+ desc.getVersion(), desc);
+    }
+  }
+
+  /**
+   * Creates a 'manifest' for the specified region, by reading directly from the HRegion object.
+   * This is used by the "online snapshot" when the table is enabled.
+   */
+  public void addRegion(final HRegion region) throws IOException {
+    // 0. Get the ManifestBuilder/RegionVisitor
+    RegionVisitor visitor = createRegionVisitor(desc);
+
+    // 1. dump region meta info into the snapshot directory
+    LOG.debug("Storing '" + region + "' region-info for snapshot.");
+    Object regionData = visitor.regionOpen(region.getRegionInfo());
+    monitor.rethrowException();
+
+    // 2. iterate through all the stores in the region
+    LOG.debug("Creating references for hfiles");
+
+    for (Store store : region.getStores().values()) {
+      // 2.1. build the snapshot reference for the store
+      Object familyData = visitor.familyOpen(regionData, store.getFamily().getName());
+      monitor.rethrowException();
+
+      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
+      }
+
+      // 2.2. iterate through all the store's files and create "references".
+      for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
+        StoreFile storeFile = storeFiles.get(i);
+        monitor.rethrowException();
+
+        // create "reference" to this store file.
+        LOG.debug("Adding reference for file (" + (i+1) + "/" + sz + "): " + storeFile.getPath());
+        visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
+      }
+      visitor.familyClose(regionData, familyData);
+    }
+    visitor.regionClose(regionData);
+  }
+
+  /**
+   * Creates a 'manifest' for the specified region, by reading directly from the disk.
+   * This is used by the "offline snapshot" when the table is disabled.
+   */
+  public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException {
+    // 0. Get the ManifestBuilder/RegionVisitor
+    RegionVisitor visitor = createRegionVisitor(desc);
+
+    // Open the RegionFS
+    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(conf, fs,
+          tableDir, regionInfo, true);
+    monitor.rethrowException();
+
+    // 1. dump region meta info into the snapshot directory
+    LOG.debug("Storing region-info for snapshot.");
+    Object regionData = visitor.regionOpen(regionInfo);
+    monitor.rethrowException();
+
+    // 2. iterate through all the stores in the region
+    LOG.debug("Creating references for hfiles");
+
+    // This ensures that we have an atomic view of the directory as long as we have < ls limit
+    // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
+    // batches and may miss files being added/deleted. This could be more robust (iteratively
+    // checking to see if we have all the files until we are sure), but the limit is currently 1000
+    // files/batch, far more than the number of store files under a single column family.
+    Collection<String> familyNames = regionFs.getFamilies();
+    if (familyNames != null) {
+      for (String familyName: familyNames) {
+        Object familyData = visitor.familyOpen(regionData, Bytes.toBytes(familyName));
+        monitor.rethrowException();
+
+        Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName);
+        if (storeFiles == null) {
+          LOG.debug("No files under family: " + familyName);
+          continue;
+        }
+
+        // 2.1. build the snapshot reference for the store
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
+        }
+
+        // 2.2. iterate through all the store's files and create "references".
+        int i = 0;
+        int sz = storeFiles.size();
+        for (StoreFileInfo storeFile: storeFiles) {
+          monitor.rethrowException();
+
+          // create "reference" to this store file.
+          LOG.debug("Adding reference for file ("+ (++i) +"/" + sz + "): " + storeFile.getPath());
+          visitor.storeFile(regionData, familyData, storeFile);
+        }
+        visitor.familyClose(regionData, familyData);
+      }
+    }
+    visitor.regionClose(regionData);
+  }
+
+  /**
+   * Load the information in the SnapshotManifest. Called by SnapshotManifest.open()
+   *
+   * If the format is v2 and there is no data-manifest, means that we are loading an
+   * in-progress snapshot. Since we support rolling-upgrades, we loook for v1 and v2
+   * regions format.
+   */
+  private void load() throws IOException {
+    switch (getSnapshotFormat(desc)) {
+      case SnapshotManifestV1.DESCRIPTOR_VERSION: {
+        this.htd = FSTableDescriptors.getTableDescriptorFromFs(fs, workingDir);
+        ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
+        try {
+          this.regionManifests =
+            SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+        } finally {
+          tpool.shutdown();
+        }
+        break;
+      }
+      case SnapshotManifestV2.DESCRIPTOR_VERSION: {
+        SnapshotDataManifest dataManifest = readDataManifest();
+        if (dataManifest != null) {
+          htd = HTableDescriptor.convert(dataManifest.getTableSchema());
+          regionManifests = dataManifest.getRegionManifestsList();
+        } else {
+          // Compatibility, load the v1 regions
+          // This happens only when the snapshot is in-progress and the cache wants to refresh.
+          List<SnapshotRegionManifest> v1Regions, v2Regions;
+          ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
+          try {
+            v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+            v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+          } finally {
+            tpool.shutdown();
+          }
+          if (v1Regions != null && v2Regions != null) {
+            regionManifests =
+              new ArrayList<SnapshotRegionManifest>(v1Regions.size() + v2Regions.size());
+            regionManifests.addAll(v1Regions);
+            regionManifests.addAll(v2Regions);
+          } else if (v1Regions != null) {
+            regionManifests = v1Regions;
+          } else /* if (v2Regions != null) */ {
+            regionManifests = v2Regions;
+          }
+        }
+        break;
+      }
+      default:
+        throw new CorruptedSnapshotException("Invalid Snapshot version: "+ desc.getVersion(), desc);
+    }
+  }
+
+  /**
+   * Get the current snapshot working dir
+   */
+  public Path getSnapshotDir() {
+    return this.workingDir;
+  }
+
+  /**
+   * Get the SnapshotDescription
+   */
+  public SnapshotDescription getSnapshotDescription() {
+    return this.desc;
+  }
+
+  /**
+   * Get the table descriptor from the Snapshot
+   */
+  public HTableDescriptor getTableDescriptor() {
+    return this.htd;
+  }
+
+  /**
+   * Get all the Region Manifest from the snapshot
+   */
+  public List<SnapshotRegionManifest> getRegionManifests() {
+    return this.regionManifests;
+  }
+
+  /**
+   * Get all the Region Manifest from the snapshot.
+   * This is an helper to get a map with the region encoded name
+   */
+  public Map<String, SnapshotRegionManifest> getRegionManifestsMap() {
+    if (regionManifests == null || regionManifests.size() == 0) return null;
+
+    HashMap<String, SnapshotRegionManifest> regionsMap =
+        new HashMap<String, SnapshotRegionManifest>(regionManifests.size());
+    for (SnapshotRegionManifest manifest: regionManifests) {
+      String regionName = getRegionNameFromManifest(manifest);
+      regionsMap.put(regionName, manifest);
+    }
+    return regionsMap;
+  }
+
+  public void consolidate() throws IOException {
+    if (getSnapshotFormat(desc) == SnapshotManifestV1.DESCRIPTOR_VERSION) {
+      Path rootDir = FSUtils.getRootDir(conf);
+      LOG.info("Using old Snapshot Format");
+      // write a copy of descriptor to the snapshot directory
+      new FSTableDescriptors(fs, rootDir)
+        .createTableDescriptorForTableDirectory(workingDir, htd, false);
+    } else {
+      LOG.debug("Convert to Single Snapshot Manifest");
+      convertToV2SingleManifest();
+    }
+  }
+
+  /*
+   * In case of rolling-upgrade, we try to read all the formats and build
+   * the snapshot with the latest format.
+   */
+  private void convertToV2SingleManifest() throws IOException {
+    // Try to load v1 and v2 regions
+    List<SnapshotRegionManifest> v1Regions, v2Regions;
+    ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
+    try {
+      v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+      v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
+    } finally {
+      tpool.shutdown();
+    }
+
+    SnapshotDataManifest.Builder dataManifestBuilder = SnapshotDataManifest.newBuilder();
+    dataManifestBuilder.setTableSchema(htd.convert());
+
+    if (v1Regions != null && v1Regions.size() > 0) {
+      dataManifestBuilder.addAllRegionManifests(v1Regions);
+    }
+    if (v2Regions != null && v2Regions.size() > 0) {
+      dataManifestBuilder.addAllRegionManifests(v2Regions);
+    }
+
+    // Write the v2 Data Manifest.
+    // Once the data-manifest is written, the snapshot can be considered complete.
+    // Currently snapshots are written in a "temporary" directory and later
+    // moved to the "complated" snapshot directory.
+    SnapshotDataManifest dataManifest = dataManifestBuilder.build();
+    writeDataManifest(dataManifest);
+    this.regionManifests = dataManifest.getRegionManifestsList();
+
+    // Remove the region manifests. Everything is now in the data-manifest.
+    // The delete operation is "relaxed", unless we get an exception we keep going.
+    // The extra files in the snapshot directory will not give any problem,
+    // since they have the same content as the data manifest, and even by re-reading
+    // them we will get the same information.
+    if (v1Regions != null && v1Regions.size() > 0) {
+      for (SnapshotRegionManifest regionManifest: v1Regions) {
+        SnapshotManifestV1.deleteRegionManifest(fs, workingDir, regionManifest);
+      }
+    }
+    if (v2Regions != null && v2Regions.size() > 0) {
+      for (SnapshotRegionManifest regionManifest: v2Regions) {
+        SnapshotManifestV2.deleteRegionManifest(fs, workingDir, regionManifest);
+      }
+    }
+  }
+
+  /*
+   * Write the SnapshotDataManifest file
+   */
+  private void writeDataManifest(final SnapshotDataManifest manifest)
+      throws IOException {
+    FSDataOutputStream stream = fs.create(new Path(workingDir, DATA_MANIFEST_NAME));
+    try {
+      manifest.writeTo(stream);
+    } finally {
+      stream.close();
+    }
+  }
+
+  /*
+   * Read the SnapshotDataManifest file
+   */
+  private SnapshotDataManifest readDataManifest() throws IOException {
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(new Path(workingDir, DATA_MANIFEST_NAME));
+      return SnapshotDataManifest.parseFrom(in);
+    } catch (FileNotFoundException e) {
+      return null;
+    } finally {
+      if (in != null) in.close();
+    }
+  }
+
+  private ThreadPoolExecutor createExecutor(final String name) {
+    return createExecutor(conf, name);
+  }
+
+  public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
+    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
+    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+              Threads.getNamedThreadFactory(name));
+  }
+
+  /**
+   * Extract the region encoded name from the region manifest
+   */
+  static String getRegionNameFromManifest(final SnapshotRegionManifest manifest) {
+    byte[] regionName = HRegionInfo.createRegionName(
+            ProtobufUtil.toTableName(manifest.getRegionInfo().getTableName()),
+            manifest.getRegionInfo().getStartKey().toByteArray(),
+            manifest.getRegionInfo().getRegionId(), true);
+    return HRegionInfo.encodeRegionName(regionName);
+  }
+
+  /*
+   * Return the snapshot format
+   */
+  private static int getSnapshotFormat(final SnapshotDescription desc) {
+    return desc.hasVersion() ? desc.getVersion() : SnapshotManifestV1.DESCRIPTOR_VERSION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java
new file mode 100644
index 0000000..0da0367
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV1.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+
+/**
+ * DO NOT USE DIRECTLY. USE {@link SnapshotManifest}.
+ *
+ * Snapshot v1 layout format
+ *  - Each region in the table is represented by a directory with the .hregioninfo file
+ *      /snapshotName/regionName/.hregioninfo
+ *  - Each file present in the table is represented by an empty file
+ *      /snapshotName/regionName/familyName/fileName
+ */
+@InterfaceAudience.Private
+public class SnapshotManifestV1 {
+  private static final Log LOG = LogFactory.getLog(SnapshotManifestV1.class);
+
+  public static final int DESCRIPTOR_VERSION = 0;
+
+  private SnapshotManifestV1() {
+  }
+
+  static class ManifestBuilder implements SnapshotManifest.RegionVisitor<
+                                                          HRegionFileSystem, Path> {
+    private final Configuration conf;
+    private final Path snapshotDir;
+    private final FileSystem fs;
+
+    public ManifestBuilder(final Configuration conf, final FileSystem fs, final Path snapshotDir) {
+      this.snapshotDir = snapshotDir;
+      this.conf = conf;
+      this.fs = fs;
+    }
+
+    public HRegionFileSystem regionOpen(final HRegionInfo regionInfo) throws IOException {
+      HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
+        fs, snapshotDir, regionInfo);
+      return snapshotRegionFs;
+    }
+
+    public void regionClose(final HRegionFileSystem region) {
+    }
+
+    public Path familyOpen(final HRegionFileSystem snapshotRegionFs, final byte[] familyName) {
+      Path familyDir = snapshotRegionFs.getStoreDir(Bytes.toString(familyName));
+      return familyDir;
+    }
+
+    public void familyClose(final HRegionFileSystem region, final Path family) {
+    }
+
+    public void storeFile(final HRegionFileSystem region, final Path familyDir,
+        final StoreFileInfo storeFile) throws IOException {
+      Path referenceFile = new Path(familyDir, storeFile.getPath().getName());
+      boolean success = true;
+      if (storeFile.isReference()) {
+        // write the Reference object to the snapshot
+        storeFile.getReference().write(fs, referenceFile);
+      } else {
+        // create "reference" to this store file.  It is intentionally an empty file -- all
+        // necessary information is captured by its fs location and filename.  This allows us to
+        // only figure out what needs to be done via a single nn operation (instead of having to
+        // open and read the files as well).
+        success = fs.createNewFile(referenceFile);
+      }
+      if (!success) {
+        throw new IOException("Failed to create reference file:" + referenceFile);
+      }
+    }
+  }
+
+  static List<SnapshotRegionManifest> loadRegionManifests(final Configuration conf,
+      final Executor executor,final FileSystem fs, final Path snapshotDir,
+      final SnapshotDescription desc) throws IOException {
+    FileStatus[] regions = FSUtils.listStatus(fs, snapshotDir, new FSUtils.RegionDirFilter(fs));
+    if (regions == null) {
+      LOG.info("No regions under directory:" + snapshotDir);
+      return null;
+    }
+
+    final ExecutorCompletionService<SnapshotRegionManifest> completionService =
+      new ExecutorCompletionService<SnapshotRegionManifest>(executor);
+    for (final FileStatus region: regions) {
+      completionService.submit(new Callable<SnapshotRegionManifest>() {
+        @Override
+        public SnapshotRegionManifest call() throws IOException {
+          HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, region.getPath());
+          return buildManifestFromDisk(conf, fs, snapshotDir, hri);
+        }
+      });
+    }
+
+    ArrayList<SnapshotRegionManifest> regionsManifest =
+        new ArrayList<SnapshotRegionManifest>(regions.length);
+    try {
+      for (int i = 0; i < regions.length; ++i) {
+        regionsManifest.add(completionService.take().get());
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      IOException ex = new IOException();
+      ex.initCause(e.getCause());
+      throw ex;
+    }
+    return regionsManifest;
+  }
+
+  static void deleteRegionManifest(final FileSystem fs, final Path snapshotDir,
+      final SnapshotRegionManifest manifest) throws IOException {
+    String regionName = SnapshotManifest.getRegionNameFromManifest(manifest);
+    fs.delete(new Path(snapshotDir, regionName), true);
+  }
+
+  static SnapshotRegionManifest buildManifestFromDisk (final Configuration conf,
+      final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo) throws IOException {
+    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(conf, fs,
+          tableDir, regionInfo, true);
+    SnapshotRegionManifest.Builder manifest = SnapshotRegionManifest.newBuilder();
+
+    // 1. dump region meta info into the snapshot directory
+    LOG.debug("Storing region-info for snapshot.");
+    manifest.setRegionInfo(HRegionInfo.convert(regionInfo));
+
+    // 2. iterate through all the stores in the region
+    LOG.debug("Creating references for hfiles");
+
+    // This ensures that we have an atomic view of the directory as long as we have < ls limit
+    // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
+    // batches and may miss files being added/deleted. This could be more robust (iteratively
+    // checking to see if we have all the files until we are sure), but the limit is currently 1000
+    // files/batch, far more than the number of store files under a single column family.
+    Collection<String> familyNames = regionFs.getFamilies();
+    if (familyNames != null) {
+      for (String familyName: familyNames) {
+        Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName, false);
+        if (storeFiles == null) {
+          LOG.debug("No files under family: " + familyName);
+          continue;
+        }
+
+        // 2.1. build the snapshot reference for the store
+        SnapshotRegionManifest.FamilyFiles.Builder family =
+              SnapshotRegionManifest.FamilyFiles.newBuilder();
+        family.setFamilyName(HBaseZeroCopyByteString.wrap(Bytes.toBytes(familyName)));
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
+        }
+
+        // 2.2. iterate through all the store's files and create "references".
+        int i = 0;
+        int sz = storeFiles.size();
+        for (StoreFileInfo storeFile: storeFiles) {
+          // create "reference" to this store file.
+          LOG.debug("Adding reference for file ("+ (++i) +"/" + sz + "): " + storeFile.getPath());
+          SnapshotRegionManifest.StoreFile.Builder sfManifest =
+                SnapshotRegionManifest.StoreFile.newBuilder();
+          sfManifest.setName(storeFile.getPath().getName());
+          family.addStoreFiles(sfManifest.build());
+        }
+        manifest.addFamilyFiles(family.build());
+      }
+    }
+    return manifest.build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
new file mode 100644
index 0000000..585c454
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+
+/**
+ * DO NOT USE DIRECTLY. USE {@link SnapshotManifest}.
+ *
+ * Snapshot v2 layout format
+ *  - Single Manifest file containing all the information of regions
+ *  - In the online-snapshot case each region will write a "region manifest"
+ *      /snapshotName/manifest.regionName
+ */
+@InterfaceAudience.Private
+public class SnapshotManifestV2 {
+  private static final Log LOG = LogFactory.getLog(SnapshotManifestV2.class);
+
+  public static final int DESCRIPTOR_VERSION = 2;
+
+  private static final String SNAPSHOT_MANIFEST_PREFIX = "region-manifest.";
+
+  static class ManifestBuilder implements SnapshotManifest.RegionVisitor<
+                    SnapshotRegionManifest.Builder, SnapshotRegionManifest.FamilyFiles.Builder> {
+    private final Configuration conf;
+    private final Path snapshotDir;
+    private final FileSystem fs;
+
+    public ManifestBuilder(final Configuration conf, final FileSystem fs, final Path snapshotDir) {
+      this.snapshotDir = snapshotDir;
+      this.conf = conf;
+      this.fs = fs;
+    }
+
+    public SnapshotRegionManifest.Builder regionOpen(final HRegionInfo regionInfo) {
+      SnapshotRegionManifest.Builder manifest = SnapshotRegionManifest.newBuilder();
+      manifest.setRegionInfo(HRegionInfo.convert(regionInfo));
+      return manifest;
+    }
+
+    public void regionClose(final SnapshotRegionManifest.Builder region) throws IOException {
+      SnapshotRegionManifest manifest = region.build();
+      FSDataOutputStream stream = fs.create(getRegionManifestPath(snapshotDir, manifest));
+      try {
+        manifest.writeTo(stream);
+      } finally {
+        stream.close();
+      }
+    }
+
+    public SnapshotRegionManifest.FamilyFiles.Builder familyOpen(
+        final SnapshotRegionManifest.Builder region, final byte[] familyName) {
+      SnapshotRegionManifest.FamilyFiles.Builder family =
+          SnapshotRegionManifest.FamilyFiles.newBuilder();
+      family.setFamilyName(HBaseZeroCopyByteString.wrap(familyName));
+      return family;
+    }
+
+    public void familyClose(final SnapshotRegionManifest.Builder region,
+        final SnapshotRegionManifest.FamilyFiles.Builder family) {
+      region.addFamilyFiles(family.build());
+    }
+
+    public void storeFile(final SnapshotRegionManifest.Builder region,
+        final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile)
+        throws IOException {
+      SnapshotRegionManifest.StoreFile.Builder sfManifest =
+            SnapshotRegionManifest.StoreFile.newBuilder();
+      sfManifest.setName(storeFile.getPath().getName());
+      if (storeFile.isReference()) {
+        sfManifest.setReference(storeFile.getReference().convert());
+      }
+      sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen());
+      family.addStoreFiles(sfManifest.build());
+    }
+  }
+
+  static List<SnapshotRegionManifest> loadRegionManifests(final Configuration conf,
+      final Executor executor,final FileSystem fs, final Path snapshotDir,
+      final SnapshotDescription desc) throws IOException {
+    FileStatus[] manifestFiles = FSUtils.listStatus(fs, snapshotDir, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith(SNAPSHOT_MANIFEST_PREFIX);
+      }
+    });
+
+    if (manifestFiles == null || manifestFiles.length == 0) return null;
+
+    final ExecutorCompletionService<SnapshotRegionManifest> completionService =
+      new ExecutorCompletionService<SnapshotRegionManifest>(executor);
+    for (final FileStatus st: manifestFiles) {
+      completionService.submit(new Callable<SnapshotRegionManifest>() {
+        @Override
+        public SnapshotRegionManifest call() throws IOException {
+          FSDataInputStream stream = fs.open(st.getPath());
+          try {
+            return SnapshotRegionManifest.parseFrom(stream);
+          } finally {
+            stream.close();
+          }
+        }
+      });
+    }
+
+    ArrayList<SnapshotRegionManifest> regionsManifest =
+        new ArrayList<SnapshotRegionManifest>(manifestFiles.length);
+    try {
+      for (int i = 0; i < manifestFiles.length; ++i) {
+        regionsManifest.add(completionService.take().get());
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      IOException ex = new IOException();
+      ex.initCause(e.getCause());
+      throw ex;
+    }
+    return regionsManifest;
+  }
+
+  static void deleteRegionManifest(final FileSystem fs, final Path snapshotDir,
+      final SnapshotRegionManifest manifest) throws IOException {
+    fs.delete(getRegionManifestPath(snapshotDir, manifest), true);
+  }
+
+  private static Path getRegionManifestPath(final Path snapshotDir,
+      final SnapshotRegionManifest manifest) {
+    String regionName = SnapshotManifest.getRegionNameFromManifest(manifest);
+    return new Path(snapshotDir, SNAPSHOT_MANIFEST_PREFIX + regionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
index dc977bb..c8d0e5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
@@ -18,25 +18,31 @@
 
 package org.apache.hadoop.hbase.snapshot;
 
-import java.io.IOException;
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.FSVisitor;
 
 /**
@@ -44,8 +50,15 @@ import org.apache.hadoop.hbase.util.FSVisitor;
  */
 @InterfaceAudience.Private
 public final class SnapshotReferenceUtil {
-  public interface FileVisitor extends FSVisitor.StoreFileVisitor,
-    FSVisitor.RecoveredEditsVisitor, FSVisitor.LogFileVisitor {
+  public static final Log LOG = LogFactory.getLog(SnapshotReferenceUtil.class);
+
+  public interface StoreFileVisitor {
+    void storeFile(final HRegionInfo regionInfo, final String familyName,
+       final SnapshotRegionManifest.StoreFile storeFile) throws IOException;
+  }
+
+  public interface SnapshotVisitor extends StoreFileVisitor,
+    FSVisitor.LogFileVisitor {
   }
 
   private SnapshotReferenceUtil() {
@@ -64,80 +77,79 @@ public final class SnapshotReferenceUtil {
   }
 
   /**
-   * Get the snapshotted recovered.edits dir for the specified region.
-   *
-   * @param snapshotDir directory where the specific snapshot is stored
-   * @param regionName name of the region
-   * @return path to the recovered.edits directory for the specified region files.
-   */
-  public static Path getRecoveredEditsDir(Path snapshotDir, String regionName) {
-    return HLogUtil.getRegionDirRecoveredEditsDir(new Path(snapshotDir, regionName));
-  }
-
-  /**
-   * Get the snapshot recovered.edits file
+   * Iterate over the snapshot store files, restored.edits and logs
    *
-   * @param snapshotDir directory where the specific snapshot is stored
-   * @param regionName name of the region
-   * @param logfile name of the edit file
-   * @return full path of the log file for the specified region files.
+   * @param conf The current {@link Configuration} instance.
+   * @param fs {@link FileSystem}
+   * @param snapshotDir {@link Path} to the Snapshot directory
+   * @param visitor callback object to get the referenced files
+   * @throws IOException if an error occurred while scanning the directory
    */
-  public static Path getRecoveredEdits(Path snapshotDir, String regionName, String logfile) {
-    return new Path(getRecoveredEditsDir(snapshotDir, regionName), logfile);
+  public static void visitReferencedFiles(final Configuration conf, final FileSystem fs,
+      final Path snapshotDir, final SnapshotVisitor visitor)
+      throws IOException {
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+    visitReferencedFiles(conf, fs, snapshotDir, desc, visitor);
   }
 
   /**
    * Iterate over the snapshot store files, restored.edits and logs
    *
+   * @param conf The current {@link Configuration} instance.
    * @param fs {@link FileSystem}
    * @param snapshotDir {@link Path} to the Snapshot directory
+   * @param desc the {@link SnapshotDescription} of the snapshot to verify
    * @param visitor callback object to get the referenced files
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void visitReferencedFiles(final FileSystem fs, final Path snapshotDir,
-      final FileVisitor visitor) throws IOException {
-    visitTableStoreFiles(fs, snapshotDir, visitor);
-    visitRecoveredEdits(fs, snapshotDir, visitor);
+  public static void visitReferencedFiles(final Configuration conf, final FileSystem fs,
+      final Path snapshotDir, final SnapshotDescription desc, final SnapshotVisitor visitor)
+      throws IOException {
+    visitTableStoreFiles(conf, fs, snapshotDir, desc, visitor);
     visitLogFiles(fs, snapshotDir, visitor);
   }
 
-  /**
+  /**©
    * Iterate over the snapshot store files
    *
+   * @param conf The current {@link Configuration} instance.
    * @param fs {@link FileSystem}
    * @param snapshotDir {@link Path} to the Snapshot directory
+   * @param desc the {@link SnapshotDescription} of the snapshot to verify
    * @param visitor callback object to get the store files
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void visitTableStoreFiles(final FileSystem fs, final Path snapshotDir,
-      final FSVisitor.StoreFileVisitor visitor) throws IOException {
-    FSVisitor.visitTableStoreFiles(fs, snapshotDir, visitor);
+  static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
+      final Path snapshotDir, final SnapshotDescription desc, final StoreFileVisitor visitor)
+      throws IOException {
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, desc);
+    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
+    if (regionManifests == null || regionManifests.size() == 0) {
+      LOG.debug("No manifest files present: " + snapshotDir);
+      return;
+    }
+
+    for (SnapshotRegionManifest regionManifest: regionManifests) {
+      visitRegionStoreFiles(regionManifest, visitor);
+    }
   }
 
   /**
    * Iterate over the snapshot store files in the specified region
    *
-   * @param fs {@link FileSystem}
-   * @param regionDir {@link Path} to the Snapshot region directory
+   * @param manifest snapshot manifest to inspect
    * @param visitor callback object to get the store files
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void visitRegionStoreFiles(final FileSystem fs, final Path regionDir,
-      final FSVisitor.StoreFileVisitor visitor) throws IOException {
-    FSVisitor.visitRegionStoreFiles(fs, regionDir, visitor);
-  }
-
-  /**
-   * Iterate over the snapshot recovered.edits
-   *
-   * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
-   * @param visitor callback object to get the recovered.edits files
-   * @throws IOException if an error occurred while scanning the directory
-   */
-  public static void visitRecoveredEdits(final FileSystem fs, final Path snapshotDir,
-      final FSVisitor.RecoveredEditsVisitor visitor) throws IOException {
-    FSVisitor.visitTableRecoveredEdits(fs, snapshotDir, visitor);
+  static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
+      final StoreFileVisitor visitor) throws IOException {
+    HRegionInfo regionInfo = HRegionInfo.convert(manifest.getRegionInfo());
+    for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
+      String familyName = familyFiles.getFamilyName().toStringUtf8();
+      for (SnapshotRegionManifest.StoreFile storeFile: familyFiles.getStoreFilesList()) {
+        visitor.storeFile(regionInfo, familyName, storeFile);
+      }
+    }
   }
 
   /**
@@ -165,85 +177,177 @@ public final class SnapshotReferenceUtil {
    */
   public static void verifySnapshot(final Configuration conf, final FileSystem fs,
       final Path snapshotDir, final SnapshotDescription snapshotDesc) throws IOException {
-    final TableName table = TableName.valueOf(snapshotDesc.getTable());
-    visitTableStoreFiles(fs, snapshotDir, new FSVisitor.StoreFileVisitor() {
-      public void storeFile (final String region, final String family, final String hfile)
-          throws IOException {
-        HFileLink link = HFileLink.create(conf, table, region, family, hfile);
-        try {
-          link.getFileStatus(fs);
-        } catch (FileNotFoundException e) {
-          throw new CorruptedSnapshotException("Corrupted snapshot '" + snapshotDesc + "'", e);
-        }
-      }
-    });
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+    verifySnapshot(conf, fs, manifest);
   }
 
   /**
-   * Returns the set of region names available in the snapshot.
+   * Verify the validity of the snapshot
    *
+   * @param conf The current {@link Configuration} instance.
    * @param fs {@link FileSystem}
-   * @param snapshotDir {@link Path} to the Snapshot directory
+   * @param manifest snapshot manifest to inspect
+   * @throws CorruptedSnapshotException if the snapshot is corrupted
    * @throws IOException if an error occurred while scanning the directory
-   * @return the set of the regions contained in the snapshot
    */
-  public static Set<String> getSnapshotRegionNames(final FileSystem fs, final Path snapshotDir)
-      throws IOException {
-    FileStatus[] regionDirs = FSUtils.listStatus(fs, snapshotDir, new FSUtils.RegionDirFilter(fs));
-    if (regionDirs == null) return null;
+  public static void verifySnapshot(final Configuration conf, final FileSystem fs,
+      final SnapshotManifest manifest) throws IOException {
+    final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
+    final Path snapshotDir = manifest.getSnapshotDir();
+    concurrentVisitReferencedFiles(conf, fs, manifest, new StoreFileVisitor() {
+      @Override
+      public void storeFile(final HRegionInfo regionInfo, final String family,
+          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+        verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
+      }
+    });
+  }
+
+  public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
+      final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException {
+    final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
+    final Path snapshotDir = manifest.getSnapshotDir();
 
-    Set<String> regions = new HashSet<String>();
-    for (FileStatus regionDir: regionDirs) {
-      regions.add(regionDir.getPath().getName());
+    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
+    if (regionManifests == null || regionManifests.size() == 0) {
+      LOG.debug("No manifest files present: " + snapshotDir);
+      return;
+    }
+
+    ExecutorService exec = SnapshotManifest.createExecutor(conf, "VerifySnapshot");
+    final ExecutorCompletionService<Void> completionService =
+      new ExecutorCompletionService<Void>(exec);
+    try {
+      for (final SnapshotRegionManifest regionManifest: regionManifests) {
+        completionService.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws IOException {
+            visitRegionStoreFiles(regionManifest, visitor);
+            return null;
+          }
+        });
+      }
+      try {
+        for (int i = 0; i < regionManifests.size(); ++i) {
+          completionService.take().get();
+        }
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException(e.getMessage());
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof CorruptedSnapshotException) {
+          throw new CorruptedSnapshotException(e.getCause().getMessage(), snapshotDesc);
+        } else {
+          IOException ex = new IOException();
+          ex.initCause(e.getCause());
+          throw ex;
+        }
+      }
+    } finally {
+      exec.shutdown();
     }
-    return regions;
   }
 
   /**
-   * Get the list of hfiles for the specified snapshot region.
-   * NOTE: The current implementation keeps one empty file per HFile in the region.
-   * The file name matches the one in the original table, and by reconstructing
-   * the path you can quickly jump to the referenced file.
+   * Verify the validity of the snapshot store file
    *
+   * @param conf The current {@link Configuration} instance.
    * @param fs {@link FileSystem}
-   * @param snapshotRegionDir {@link Path} to the Snapshot region directory
-   * @return Map of hfiles per family, the key is the family name and values are hfile names
+   * @param snapshotDir {@link Path} to the Snapshot directory of the snapshot to verify
+   * @param snapshot the {@link SnapshotDescription} of the snapshot to verify
+   * @param regionInfo {@link HRegionInfo} of the region that contains the store file
+   * @param family family that contains the store file
+   * @param storeFile the store file to verify
+   * @throws CorruptedSnapshotException if the snapshot is corrupted
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static Map<String, List<String>> getRegionHFileReferences(final FileSystem fs,
-      final Path snapshotRegionDir) throws IOException {
-    final Map<String, List<String>> familyFiles = new TreeMap<String, List<String>>();
+  private static void verifyStoreFile(final Configuration conf, final FileSystem fs,
+      final Path snapshotDir, final SnapshotDescription snapshot, final HRegionInfo regionInfo,
+      final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+    TableName table = TableName.valueOf(snapshot.getTable());
+    String fileName = storeFile.getName();
 
-    visitRegionStoreFiles(fs, snapshotRegionDir,
-      new FSVisitor.StoreFileVisitor() {
-        public void storeFile (final String region, final String family, final String hfile)
-            throws IOException {
-          List<String> hfiles = familyFiles.get(family);
-          if (hfiles == null) {
-            hfiles = new LinkedList<String>();
-            familyFiles.put(family, hfiles);
-          }
-          hfiles.add(hfile);
-        }
-    });
+    Path refPath = null;
+    if (StoreFileInfo.isReference(fileName)) {
+      // If is a reference file check if the parent file is present in the snapshot
+      refPath = new Path(new Path(regionInfo.getEncodedName(), family), fileName);
+      refPath = StoreFileInfo.getReferredToFile(refPath);
+      String refRegion = refPath.getParent().getParent().getName();
+      refPath = HFileLink.createPath(table, refRegion, family, refPath.getName());
+      if (!new HFileLink(conf, refPath).exists(fs)) {
+        throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName +
+          " path=" + refPath, snapshot);
+      }
+
+      if (storeFile.hasReference()) {
+        // We don't really need to look for the file on-disk
+        // we already have the Reference information embedded here.
+        return;
+      }
+    }
 
-    return familyFiles;
+    Path linkPath;
+    if (refPath != null && HFileLink.isHFileLink(refPath)) {
+      linkPath = new Path(family, refPath.getName());
+    } else if (HFileLink.isHFileLink(fileName)) {
+      linkPath = new Path(family, fileName);
+    } else {
+      linkPath = new Path(family, HFileLink.createHFileLinkName(
+        table, regionInfo.getEncodedName(), fileName));
+    }
+
+    // check if the linked file exists (in the archive, or in the table dir)
+    HFileLink link = new HFileLink(conf, linkPath);
+    try {
+      FileStatus fstat = link.getFileStatus(fs);
+      if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {
+        String msg = "hfile: " + fileName + " size does not match with the expected one. " +
+          " found=" + fstat.getLen() + " expected=" + storeFile.getFileSize();
+        LOG.error(msg);
+        throw new CorruptedSnapshotException(msg, snapshot);
+      }
+    } catch (FileNotFoundException e) {
+      String msg = "Can't find hfile: " + fileName + " in the real (" +
+          link.getOriginPath() + ") or archive (" + link.getArchivePath()
+          + ") directory for the primary table.";
+      LOG.error(msg);
+      throw new CorruptedSnapshotException(msg, snapshot);
+    }
   }
 
   /**
    * Returns the store file names in the snapshot.
    *
+   * @param conf The current {@link Configuration} instance.
+   * @param fs {@link FileSystem}
+   * @param snapshotDir {@link Path} to the Snapshot directory
+   * @throws IOException if an error occurred while scanning the directory
+   * @return the names of hfiles in the specified snaphot
+   */
+  public static Set<String> getHFileNames(final Configuration conf, final FileSystem fs,
+      final Path snapshotDir) throws IOException {
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+    return getHFileNames(conf, fs, snapshotDir, desc);
+  }
+
+  /**
+   * Returns the store file names in the snapshot.
+   *
+   * @param conf The current {@link Configuration} instance.
    * @param fs {@link FileSystem}
    * @param snapshotDir {@link Path} to the Snapshot directory
+   * @param snapshotDesc the {@link SnapshotDescription} of the snapshot to inspect
    * @throws IOException if an error occurred while scanning the directory
    * @return the names of hfiles in the specified snaphot
    */
-  public static Set<String> getHFileNames(final FileSystem fs, final Path snapshotDir)
+  private static Set<String> getHFileNames(final Configuration conf, final FileSystem fs,
+      final Path snapshotDir, final SnapshotDescription snapshotDesc)
       throws IOException {
     final Set<String> names = new HashSet<String>();
-    visitTableStoreFiles(fs, snapshotDir, new FSVisitor.StoreFileVisitor() {
-      public void storeFile (final String region, final String family, final String hfile)
-          throws IOException {
+    visitTableStoreFiles(conf, fs, snapshotDir, snapshotDesc, new StoreFileVisitor() {
+      @Override
+      public void storeFile(final HRegionInfo regionInfo, final String family,
+            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+        String hfile = storeFile.getName();
         if (HFileLink.isHFileLink(hfile)) {
           names.add(HFileLink.getReferencedHFileName(hfile));
         } else {
@@ -266,6 +370,7 @@ public final class SnapshotReferenceUtil {
       throws IOException {
     final Set<String> names = new HashSet<String>();
     visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
+      @Override
       public void logFile (final String server, final String logfile) throws IOException {
         names.add(logfile);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotTask.java
deleted file mode 100644
index ede2d85..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotTask.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.snapshot;
-
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.errorhandling.ForeignException;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-
-/**
- * General snapshot operation taken on a regionserver
- */
-@InterfaceAudience.Private
-public abstract class SnapshotTask implements ForeignExceptionSnare, Callable<Void>{
-
-  protected final SnapshotDescription snapshot;
-  protected final ForeignExceptionDispatcher errorMonitor;
-
-  /**
-   * @param snapshot Description of the snapshot we are going to operate on
-   * @param monitor listener interested in failures to the snapshot caused by this operation
-   */
-  public SnapshotTask(SnapshotDescription snapshot, ForeignExceptionDispatcher monitor) {
-    assert monitor != null : "ForeignExceptionDispatcher must not be null!";
-    assert snapshot != null : "SnapshotDescription must not be null!";
-    this.snapshot = snapshot;
-    this.errorMonitor = monitor;
-  }
-
-  public void snapshotFailure(String message, Exception e) {
-    ForeignException ee = new ForeignException(message, e);
-    errorMonitor.receive(ee);
-  }
-
-  @Override
-  public void rethrowException() throws ForeignException {
-    this.errorMonitor.rethrowException();
-  }
-
-  @Override
-  public boolean hasException() {
-    return this.errorMonitor.hasException();
-  }
-
-  @Override
-  public ForeignException getException() {
-    return this.errorMonitor.getException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TableInfoCopyTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TableInfoCopyTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TableInfoCopyTask.java
deleted file mode 100644
index ec50b71..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TableInfoCopyTask.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.snapshot;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
-
-/**
- * Copy the table info into the snapshot directory
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class TableInfoCopyTask extends SnapshotTask {
-
-  public static final Log LOG = LogFactory.getLog(TableInfoCopyTask.class);
-  private final FileSystem fs;
-  private final Path rootDir;
-
-  /**
-   * Copy the table info for the given table into the snapshot
-   * @param monitor listen for errors while running the snapshot
-   * @param snapshot snapshot for which we are copying the table info
-   * @param fs {@link FileSystem} where the tableinfo is stored (and where the copy will be written)
-   * @param rootDir root of the {@link FileSystem} where the tableinfo is stored
-   */
-  public TableInfoCopyTask(ForeignExceptionDispatcher monitor,
-      SnapshotDescription snapshot, FileSystem fs, Path rootDir) {
-    super(snapshot, monitor);
-    this.rootDir = rootDir;
-    this.fs = fs;
-  }
-
-  @Override
-  public Void call() throws Exception {
-    LOG.debug("Running table info copy.");
-    this.rethrowException();
-    LOG.debug("Attempting to copy table info for snapshot:"
-        + ClientSnapshotDescriptionUtils.toString(this.snapshot));
-    // get the HTable descriptor
-    HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir,
-        TableName.valueOf(this.snapshot.getTable()));
-    this.rethrowException();
-    // write a copy of descriptor to the snapshot directory
-    Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
-    new FSTableDescriptors(fs, rootDir)
-      .createTableDescriptorForTableDirectory(snapshotDir, orig, false);
-    LOG.debug("Finished copying tableinfo.");
-    return null;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java
deleted file mode 100644
index fdc1834..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.snapshot;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
-import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Utilities for useful when taking a snapshot
- */
-public class TakeSnapshotUtils {
-
-  private static final Log LOG = LogFactory.getLog(TakeSnapshotUtils.class);
-
-  private TakeSnapshotUtils() {
-    // private constructor for util class
-  }
-
-  /**
-   * Get the per-region snapshot description location.
-   * <p>
-   * Under the per-snapshot directory, specific files per-region are kept in a similar layout as per
-   * the current directory layout.
-   * @param desc description of the snapshot
-   * @param rootDir root directory for the hbase installation
-   * @param regionName encoded name of the region (see {@link HRegionInfo#encodeRegionName(byte[])})
-   * @return path to the per-region directory for the snapshot
-   */
-  public static Path getRegionSnapshotDirectory(SnapshotDescription desc, Path rootDir,
-      String regionName) {
-    Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
-    return HRegion.getRegionDir(snapshotDir, regionName);
-  }
-
-  /**
-   * Get the snapshot directory for each family to be added to the the snapshot
-   * @param snapshot description of the snapshot being take
-   * @param snapshotRegionDir directory in the snapshot where the region directory information
-   *          should be stored
-   * @param families families to be added (can be null)
-   * @return paths to the snapshot directory for each family, in the same order as the families
-   *         passed in
-   */
-  public static List<Path> getFamilySnapshotDirectories(SnapshotDescription snapshot,
-      Path snapshotRegionDir, FileStatus[] families) {
-    if (families == null || families.length == 0) return Collections.emptyList();
-
-    List<Path> familyDirs = new ArrayList<Path>(families.length);
-    for (FileStatus family : families) {
-      // build the reference directory name
-      familyDirs.add(new Path(snapshotRegionDir, family.getPath().getName()));
-    }
-    return familyDirs;
-  }
-
-  /**
-   * Create a snapshot timer for the master which notifies the monitor when an error occurs
-   * @param snapshot snapshot to monitor
-   * @param conf configuration to use when getting the max snapshot life
-   * @param monitor monitor to notify when the snapshot life expires
-   * @return the timer to use update to signal the start and end of the snapshot
-   */
-  public static TimeoutExceptionInjector getMasterTimerAndBindToMonitor(SnapshotDescription snapshot,
-      Configuration conf, ForeignExceptionListener monitor) {
-    long maxTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
-      SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
-    return new TimeoutExceptionInjector(monitor, maxTime);
-  }
-
-  /**
-   * Verify that all the expected logs got referenced
-   * @param fs filesystem where the logs live
-   * @param logsDir original logs directory
-   * @param serverNames names of the servers that involved in the snapshot
-   * @param snapshot description of the snapshot being taken
-   * @param snapshotLogDir directory for logs in the snapshot
-   * @throws IOException
-   */
-  public static void verifyAllLogsGotReferenced(FileSystem fs, Path logsDir,
-      Set<String> serverNames, SnapshotDescription snapshot, Path snapshotLogDir)
-      throws IOException {
-    assertTrue(snapshot, "Logs directory doesn't exist in snapshot", fs.exists(logsDir));
-    // for each of the server log dirs, make sure it matches the main directory
-    Multimap<String, String> snapshotLogs = getMapOfServersAndLogs(fs, snapshotLogDir, serverNames);
-    Multimap<String, String> realLogs = getMapOfServersAndLogs(fs, logsDir, serverNames);
-    if (realLogs != null) {
-      assertNotNull(snapshot, "No server logs added to snapshot", snapshotLogs);
-    } else {
-      assertNull(snapshot, "Snapshotted server logs that don't exist", snapshotLogs);
-    }
-
-    // check the number of servers
-    Set<Entry<String, Collection<String>>> serverEntries = realLogs.asMap().entrySet();
-    Set<Entry<String, Collection<String>>> snapshotEntries = snapshotLogs.asMap().entrySet();
-    assertEquals(snapshot, "Not the same number of snapshot and original server logs directories",
-      serverEntries.size(), snapshotEntries.size());
-
-    // verify we snapshotted each of the log files
-    for (Entry<String, Collection<String>> serverLogs : serverEntries) {
-      // if the server is not the snapshot, skip checking its logs
-      if (!serverNames.contains(serverLogs.getKey())) continue;
-      Collection<String> snapshotServerLogs = snapshotLogs.get(serverLogs.getKey());
-      assertNotNull(snapshot, "Snapshots missing logs for server:" + serverLogs.getKey(),
-        snapshotServerLogs);
-
-      // check each of the log files
-      assertEquals(snapshot,
-        "Didn't reference all the log files for server:" + serverLogs.getKey(), serverLogs
-            .getValue().size(), snapshotServerLogs.size());
-      for (String log : serverLogs.getValue()) {
-        assertTrue(snapshot, "Snapshot logs didn't include " + log,
-          snapshotServerLogs.contains(log));
-      }
-    }
-  }
-
-  /**
-   * Verify one of a snapshot's region's recovered.edits, has been at the surface (file names,
-   * length), match the original directory.
-   * @param fs filesystem on which the snapshot had been taken
-   * @param rootDir full path to the root hbase directory
-   * @param regionInfo info for the region
-   * @param snapshot description of the snapshot that was taken
-   * @throws IOException if there is an unexpected error talking to the filesystem
-   */
-  public static void verifyRecoveredEdits(FileSystem fs, Path rootDir, HRegionInfo regionInfo,
-      SnapshotDescription snapshot) throws IOException {
-    Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
-    Path editsDir = HLogUtil.getRegionDirRecoveredEditsDir(regionDir);
-    Path snapshotRegionDir = TakeSnapshotUtils.getRegionSnapshotDirectory(snapshot, rootDir,
-      regionInfo.getEncodedName());
-    Path snapshotEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(snapshotRegionDir);
-
-    FileStatus[] edits = FSUtils.listStatus(fs, editsDir);
-    FileStatus[] snapshotEdits = FSUtils.listStatus(fs, snapshotEditsDir);
-    if (edits == null) {
-      assertNull(snapshot, "Snapshot has edits but table doesn't", snapshotEdits);
-      return;
-    }
-
-    assertNotNull(snapshot, "Table has edits, but snapshot doesn't", snapshotEdits);
-
-    // check each of the files
-    assertEquals(snapshot, "Not same number of edits in snapshot as table", edits.length,
-      snapshotEdits.length);
-
-    // make sure we have a file with the same name as the original
-    // it would be really expensive to verify the content matches the original
-    for (FileStatus edit : edits) {
-      for (FileStatus sEdit : snapshotEdits) {
-        if (sEdit.getPath().equals(edit.getPath())) {
-          assertEquals(snapshot, "Snapshot file" + sEdit.getPath()
-              + " length not equal to the original: " + edit.getPath(), edit.getLen(),
-            sEdit.getLen());
-          break;
-        }
-      }
-      assertTrue(snapshot, "No edit in snapshot with name:" + edit.getPath(), false);
-    }
-  }
-
-  private static void assertNull(SnapshotDescription snapshot, String msg, Object isNull)
-      throws CorruptedSnapshotException {
-    if (isNull != null) {
-      throw new CorruptedSnapshotException(msg + ", Expected " + isNull + " to be null.", snapshot);
-    }
-  }
-
-  private static void assertNotNull(SnapshotDescription snapshot, String msg, Object notNull)
-      throws CorruptedSnapshotException {
-    if (notNull == null) {
-      throw new CorruptedSnapshotException(msg + ", Expected object to not be null, but was null.",
-          snapshot);
-    }
-  }
-
-  private static void assertTrue(SnapshotDescription snapshot, String msg, boolean isTrue)
-      throws CorruptedSnapshotException {
-    if (!isTrue) {
-      throw new CorruptedSnapshotException(msg + ", Expected true, but was false", snapshot);
-    }
-  }
-
-  /**
-   * Assert that the expect matches the gotten amount
-   * @param msg message to add the to exception
-   * @param expected
-   * @param gotten
-   * @throws CorruptedSnapshotException thrown if the two elements don't match
-   */
-  private static void assertEquals(SnapshotDescription snapshot, String msg, int expected,
-      int gotten) throws CorruptedSnapshotException {
-    if (expected != gotten) {
-      throw new CorruptedSnapshotException(msg + ". Expected:" + expected + ", got:" + gotten,
-          snapshot);
-    }
-  }
-
-  /**
-   * Assert that the expect matches the gotten amount
-   * @param msg message to add the to exception
-   * @param expected
-   * @param gotten
-   * @throws CorruptedSnapshotException thrown if the two elements don't match
-   */
-  private static void assertEquals(SnapshotDescription snapshot, String msg, long expected,
-      long gotten) throws CorruptedSnapshotException {
-    if (expected != gotten) {
-      throw new CorruptedSnapshotException(msg + ". Expected:" + expected + ", got:" + gotten,
-          snapshot);
-    }
-  }
-
-  /**
-   * @param logdir
-   * @param toInclude list of servers to include. If empty or null, returns all servers
-   * @return maps of servers to all their log files. If there is no log directory, returns
-   *         <tt>null</tt>
-   */
-  private static Multimap<String, String> getMapOfServersAndLogs(FileSystem fs, Path logdir,
-      Collection<String> toInclude) throws IOException {
-    // create a path filter based on the passed directories to include
-    PathFilter filter = toInclude == null || toInclude.size() == 0 ? null
-        : new MatchesDirectoryNames(toInclude);
-
-    // get all the expected directories
-    FileStatus[] serverLogDirs = FSUtils.listStatus(fs, logdir, filter);
-    if (serverLogDirs == null) return null;
-
-    // map those into a multimap of servername -> [log files]
-    Multimap<String, String> map = HashMultimap.create();
-    for (FileStatus server : serverLogDirs) {
-      FileStatus[] serverLogs = FSUtils.listStatus(fs, server.getPath(), null);
-      if (serverLogs == null) continue;
-      for (FileStatus log : serverLogs) {
-        map.put(server.getPath().getName(), log.getPath().getName());
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Path filter that only accepts paths where that have a {@link Path#getName()} that is contained
-   * in the specified collection.
-   */
-  private static class MatchesDirectoryNames implements PathFilter {
-
-    Collection<String> paths;
-
-    public MatchesDirectoryNames(Collection<String> dirNames) {
-      this.paths = dirNames;
-    }
-
-    @Override
-    public boolean accept(Path path) {
-      return paths.contains(path.getName());
-    }
-  }
-
-  /**
-   * Get the log directory for a specific snapshot
-   * @param snapshotDir directory where the specific snapshot will be store
-   * @param serverName name of the parent regionserver for the log files
-   * @return path to the log home directory for the archive files.
-   */
-  public static Path getSnapshotHLogsDir(Path snapshotDir, String serverName) {
-    return new Path(snapshotDir, HLogUtil.getHLogDirectoryName(serverName));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
index 75729b6..c771144 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
@@ -150,7 +150,7 @@ public final class FSVisitor {
       final FSVisitor.RecoveredEditsVisitor visitor) throws IOException {
     FileStatus[] regions = FSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
     if (regions == null) {
-      LOG.info("No regions under directory:" + tableDir);
+      LOG.info("No recoveredEdits regions under directory:" + tableDir);
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index b9084b9..0451121 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -55,6 +56,10 @@ public abstract class ModifyRegionUtils {
     void fillRegion(final HRegion region) throws IOException;
   }
 
+  public interface RegionEditTask {
+    void editRegion(final HRegionInfo region) throws IOException;
+  }
+
   /**
    * Create new set of regions on the specified file-system.
    * NOTE: that you should add the regions to hbase:meta after this operation.
@@ -106,10 +111,36 @@ public abstract class ModifyRegionUtils {
       final RegionFillTask task) throws IOException {
     if (newRegions == null) return null;
     int regionNumber = newRegions.length;
-    ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
+    ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
         "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
-    CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
-        regionOpenAndInitThreadPool);
+    try {
+      return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
+    } finally {
+      exec.shutdownNow();
+    }
+  }
+
+  /**
+   * Create new set of regions on the specified file-system.
+   * NOTE: that you should add the regions to hbase:meta after this operation.
+   *
+   * @param exec Thread Pool Executor
+   * @param conf {@link Configuration}
+   * @param rootDir Root directory for HBase instance
+   * @param tableDir table directory
+   * @param hTableDescriptor description of the table
+   * @param newRegions {@link HRegionInfo} that describes the regions to create
+   * @param task {@link RegionFillTask} custom code to populate region after creation
+   * @throws IOException
+   */
+  public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
+      final Configuration conf, final Path rootDir, final Path tableDir,
+      final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+      final RegionFillTask task) throws IOException {
+    if (newRegions == null) return null;
+    int regionNumber = newRegions.length;
+    CompletionService<HRegionInfo> completionService =
+      new ExecutorCompletionService<HRegionInfo>(exec);
     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
     for (final HRegionInfo newRegion : newRegions) {
       completionService.submit(new Callable<HRegionInfo>() {
@@ -131,8 +162,6 @@ public abstract class ModifyRegionUtils {
       throw new InterruptedIOException(e.getMessage());
     } catch (ExecutionException e) {
       throw new IOException(e);
-    } finally {
-      regionOpenAndInitThreadPool.shutdownNow();
     }
     return regionInfos;
   }
@@ -166,6 +195,41 @@ public abstract class ModifyRegionUtils {
     return region.getRegionInfo();
   }
 
+  /**
+   * Execute the task on the specified set of regions.
+   *
+   * @param exec Thread Pool Executor
+   * @param regions {@link HRegionInfo} that describes the regions to edit
+   * @param task {@link RegionFillTask} custom code to edit the region
+   * @throws IOException
+   */
+  public static void editRegions(final ThreadPoolExecutor exec,
+      final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
+    final ExecutorCompletionService<Void> completionService =
+      new ExecutorCompletionService<Void>(exec);
+    for (final HRegionInfo hri: regions) {
+      completionService.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          task.editRegion(hri);
+          return null;
+        }
+      });
+    }
+
+    try {
+      for (HRegionInfo hri: regions) {
+        completionService.take().get();
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      IOException ex = new IOException();
+      ex.initCause(e.getCause());
+      throw ex;
+    }
+  }
+
   /*
    * used by createRegions() to get the thread pool executor based on the
    * "hbase.hregion.open.and.init.threads.max" property.

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e978a16/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
index ef53ad2..75f1c10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,9 +37,9 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifestV1;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -182,15 +180,6 @@ public class TestSnapshotFromClient {
     HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
     UTIL.loadTable(table, TEST_FAM, false);
 
-    // get the name of all the regionservers hosting the snapshotted table
-    Set<String> snapshotServers = new HashSet<String>();
-    List<RegionServerThread> servers = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
-    for (RegionServerThread server : servers) {
-      if (server.getRegionServer().getOnlineRegions(TABLE_NAME).size() > 0) {
-        snapshotServers.add(server.getRegionServer().getServerName().toString());
-      }
-    }
-
     LOG.debug("FS state before disable:");
     FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
       FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
@@ -203,8 +192,16 @@ public class TestSnapshotFromClient {
       FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 
     // take a snapshot of the disabled table
-    byte[] snapshot = Bytes.toBytes("offlineTableSnapshot");
-    admin.snapshot(snapshot, TABLE_NAME);
+    final String SNAPSHOT_NAME = "offlineTableSnapshot";
+    byte[] snapshot = Bytes.toBytes(SNAPSHOT_NAME);
+
+    SnapshotDescription desc = SnapshotDescription.newBuilder()
+      .setType(SnapshotDescription.Type.DISABLED)
+      .setTable(STRING_TABLE_NAME)
+      .setName(SNAPSHOT_NAME)
+      .setVersion(SnapshotManifestV1.DESCRIPTOR_VERSION)
+      .build();
+    admin.snapshot(desc);
     LOG.debug("Snapshot completed.");
 
     // make sure we have the snapshot
@@ -219,7 +216,7 @@ public class TestSnapshotFromClient {
       FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
 
     SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
-      admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
+      admin, fs);
 
     admin.deleteSnapshot(snapshot);
     snapshots = admin.listSnapshots();
@@ -263,15 +260,6 @@ public class TestSnapshotFromClient {
     // make sure we don't fail on listing snapshots
     SnapshotTestingUtils.assertNoSnapshots(admin);
 
-    // get the name of all the regionservers hosting the snapshotted table
-    Set<String> snapshotServers = new HashSet<String>();
-    List<RegionServerThread> servers = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
-    for (RegionServerThread server : servers) {
-      if (server.getRegionServer().getOnlineRegions(TABLE_NAME).size() > 0) {
-        snapshotServers.add(server.getRegionServer().getServerName().toString());
-      }
-    }
-
     LOG.debug("FS state before disable:");
     FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
       FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
@@ -299,8 +287,8 @@ public class TestSnapshotFromClient {
 
     List<byte[]> emptyCfs = Lists.newArrayList(TEST_FAM); // no file in the region
     List<byte[]> nonEmptyCfs = Lists.newArrayList();
-    SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, nonEmptyCfs, emptyCfs, rootDir,
-      admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
+    SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, nonEmptyCfs, emptyCfs,
+      rootDir, admin, fs);
 
     admin.deleteSnapshot(snapshot);
     snapshots = admin.listSnapshots();