You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/05/20 11:47:01 UTC

svn commit: r1596168 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase: master/snapshot/ regionserver/ snapshot/ util/

Author: mbertozzi
Date: Tue May 20 09:47:01 2014
New Revision: 1596168

URL: http://svn.apache.org/r1596168
Log:
HBASE-11185 Parallelize Snapshot operations

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java Tue May 20 09:47:01 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.master.Ma
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
 
@@ -88,8 +92,17 @@ public class DisabledTableSnapshotHandle
           + ClientSnapshotDescriptionUtils.toString(snapshot);
       LOG.info(msg);
       status.setStatus(msg);
-      for (HRegionInfo regionInfo: regions) {
-        snapshotDisabledRegion(regionInfo);
+
+      ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot");
+      try {
+        ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+          @Override
+          public void editRegion(final HRegionInfo regionInfo) throws IOException {
+            snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo);
+          }
+        });
+      } finally {
+        exec.shutdown();
       }
     } catch (Exception e) {
       // make sure we capture the exception to propagate back to the client later

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java Tue May 20 09:47:01 2014
@@ -129,7 +129,7 @@ public final class MasterSnapshotVerifie
 
   /**
    * Check that the table descriptor for the snapshot is a valid table descriptor
-   * @param snapshotDir snapshot directory to check
+   * @param manifest snapshot manifest to inspect
    */
   private void verifyTableInfo(final SnapshotManifest manifest) throws IOException {
     HTableDescriptor htd = manifest.getTableDescriptor();
@@ -145,7 +145,7 @@ public final class MasterSnapshotVerifie
 
   /**
    * Check that all the regions in the snapshot are valid, and accounted for.
-   * @param snapshotDir snapshot directory to check
+   * @param manifest snapshot manifest to inspect
    * @throws IOException if we can't reach hbase:meta or read the files from the FS
    */
   private void verifyRegions(final SnapshotManifest manifest) throws IOException {
@@ -167,6 +167,7 @@ public final class MasterSnapshotVerifie
       LOG.error(errorMsg);
     }
 
+    // Verify HRegionInfo
     for (HRegionInfo region : regions) {
       SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName());
       if (regionManifest == null) {
@@ -177,20 +178,23 @@ public final class MasterSnapshotVerifie
         continue;
       }
 
-      verifyRegion(fs, manifest.getSnapshotDir(), region, regionManifest);
+      verifyRegionInfo(region, regionManifest);
     }
+
     if (!errorMsg.isEmpty()) {
       throw new CorruptedSnapshotException(errorMsg);
     }
+
+    // Verify Snapshot HFiles
+    SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), fs, manifest);
   }
 
   /**
-   * Verify that the region (regioninfo, hfiles) are valid
-   * @param fs the FileSystem instance
-   * @param snapshotDir snapshot directory to check
+   * Verify that the regionInfo is valid
    * @param region the region to check
+   * @param manifest snapshot manifest to inspect
    */
-  private void verifyRegion(final FileSystem fs, final Path snapshotDir, final HRegionInfo region,
+  private void verifyRegionInfo(final HRegionInfo region,
       final SnapshotRegionManifest manifest) throws IOException {
     HRegionInfo manifestRegionInfo = HRegionInfo.convert(manifest.getRegionInfo());
     if (!region.equals(manifestRegionInfo)) {
@@ -198,16 +202,5 @@ public final class MasterSnapshotVerifie
                    "doesn't match expected region:" + region;
       throw new CorruptedSnapshotException(msg, snapshot);
     }
-
-    // make sure we have all the expected store files
-    SnapshotReferenceUtil.visitRegionStoreFiles(manifest,
-        new SnapshotReferenceUtil.StoreFileVisitor() {
-      @Override
-      public void storeFile(final HRegionInfo regionInfo, final String family,
-          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
-        SnapshotReferenceUtil.verifyStoreFile(services.getConfiguration(), fs, snapshotDir,
-          snapshot, region, family, storeFile);
-      }
-    });
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java Tue May 20 09:47:01 2014
@@ -223,6 +223,20 @@ public class StoreFileInfo {
    */
   public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs)
       throws IOException {
+    FileStatus status = getReferencedFileStatus(fs);
+    if (this.reference != null) {
+      return computeRefFileHDFSBlockDistribution(fs, reference, status);
+    } else {
+      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+    }
+  }
+
+  /**
+   * Get the {@link FileStatus} of the file referenced by this StoreFileInfo
+   * @param fs The current file system to use.
+   * @return The {@link FileStatus} of the file referenced by this StoreFileInfo
+   */
+  public FileStatus getReferencedFileStatus(final FileSystem fs) throws IOException {
     FileStatus status;
     if (this.reference != null) {
       if (this.link != null) {
@@ -233,7 +247,6 @@ public class StoreFileInfo {
         Path referencePath = getReferredToFile(this.getPath());
         status = fs.getFileStatus(referencePath);
       }
-      return computeRefFileHDFSBlockDistribution(fs, reference, status);
     } else {
       if (this.link != null) {
         // HFileLink
@@ -241,8 +254,8 @@ public class StoreFileInfo {
       } else {
         status = this.fileStatus;
       }
-      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
     }
+    return status;
   }
 
   /** @return The {@link Path} of the file */

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java Tue May 20 09:47:01 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.snapshot;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -153,6 +155,15 @@ public class RestoreSnapshotHelper {
    * @return the set of regions touched by the restore operation
    */
   public RestoreMetaChanges restoreHdfsRegions() throws IOException {
+    ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot");
+    try {
+      return restoreHdfsRegions(exec);
+    } finally {
+      exec.shutdown();
+    }
+  }
+
+  private RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException {
     LOG.debug("starting restore");
 
     Map<String, SnapshotRegionManifest> regionManifests = snapshotManifest.getRegionManifestsMap();
@@ -187,13 +198,13 @@ public class RestoreSnapshotHelper {
       // Restore regions using the snapshot data
       monitor.rethrowException();
       status.setStatus("Restoring table regions...");
-      restoreHdfsRegions(regionManifests, metaChanges.getRegionsToRestore());
+      restoreHdfsRegions(exec, regionManifests, metaChanges.getRegionsToRestore());
       status.setStatus("Finished restoring all table regions.");
 
       // Remove regions from the current table
       monitor.rethrowException();
       status.setStatus("Starting to delete excess regions from table");
-      removeHdfsRegions(metaChanges.getRegionsToRemove());
+      removeHdfsRegions(exec, metaChanges.getRegionsToRemove());
       status.setStatus("Finished deleting excess regions from table.");
     }
 
@@ -210,7 +221,7 @@ public class RestoreSnapshotHelper {
       // Create new regions cloning from the snapshot
       monitor.rethrowException();
       status.setStatus("Cloning regions...");
-      HRegionInfo[] clonedRegions = cloneHdfsRegions(regionManifests, regionsToAdd);
+      HRegionInfo[] clonedRegions = cloneHdfsRegions(exec, regionManifests, regionsToAdd);
       metaChanges.setNewRegions(clonedRegions);
       status.setStatus("Finished cloning regions.");
     }
@@ -345,23 +356,30 @@ public class RestoreSnapshotHelper {
   /**
    * Remove specified regions from the file-system, using the archiver.
    */
-  private void removeHdfsRegions(final List<HRegionInfo> regions) throws IOException {
-    if (regions != null && regions.size() > 0) {
-      for (HRegionInfo hri: regions) {
+  private void removeHdfsRegions(final ThreadPoolExecutor exec, final List<HRegionInfo> regions)
+      throws IOException {
+    if (regions == null || regions.size() == 0) return;
+    ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+      @Override
+      public void editRegion(final HRegionInfo hri) throws IOException {
         HFileArchiver.archiveRegion(conf, fs, hri);
       }
-    }
+    });
   }
 
   /**
    * Restore specified regions by restoring content to the snapshot state.
    */
-  private void restoreHdfsRegions(final Map<String, SnapshotRegionManifest> regionManifests,
+  private void restoreHdfsRegions(final ThreadPoolExecutor exec,
+      final Map<String, SnapshotRegionManifest> regionManifests,
       final List<HRegionInfo> regions) throws IOException {
     if (regions == null || regions.size() == 0) return;
-    for (HRegionInfo hri: regions) {
-      restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
-    }
+    ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+      @Override
+      public void editRegion(final HRegionInfo hri) throws IOException {
+        restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
+      }
+    });
   }
 
   private Map<String, List<SnapshotRegionManifest.StoreFile>> getRegionHFileReferences(
@@ -465,7 +483,8 @@ public class RestoreSnapshotHelper {
    * Clone specified regions. For each region create a new region
    * and create a HFileLink for each hfile.
    */
-  private HRegionInfo[] cloneHdfsRegions(final Map<String, SnapshotRegionManifest> regionManifests,
+  private HRegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec,
+      final Map<String, SnapshotRegionManifest> regionManifests,
       final List<HRegionInfo> regions) throws IOException {
     if (regions == null || regions.size() == 0) return null;
 
@@ -490,7 +509,7 @@ public class RestoreSnapshotHelper {
     }
 
     // create the regions on disk
-    ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
+    ModifyRegionUtils.createRegions(exec, conf, rootDir, tableDir,
       tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
         @Override
         public void fillRegion(final HRegion region) throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java Tue May 20 09:47:01 2014
@@ -25,6 +25,8 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -99,14 +101,14 @@ public final class SnapshotInfo extends 
       }
     }
 
-    private int hfileArchiveCount = 0;
-    private int hfilesMissing = 0;
-    private int hfilesCount = 0;
-    private int logsMissing = 0;
-    private int logsCount = 0;
-    private long hfileArchiveSize = 0;
-    private long hfileSize = 0;
-    private long logSize = 0;
+    private AtomicInteger hfileArchiveCount = new AtomicInteger();
+    private AtomicInteger hfilesMissing = new AtomicInteger();
+    private AtomicInteger hfilesCount = new AtomicInteger();
+    private AtomicInteger logsMissing = new AtomicInteger();
+    private AtomicInteger logsCount = new AtomicInteger();
+    private AtomicLong hfileArchiveSize = new AtomicLong();
+    private AtomicLong hfileSize = new AtomicLong();
+    private AtomicLong logSize = new AtomicLong();
 
     private final SnapshotDescription snapshot;
     private final TableName snapshotTable;
@@ -128,57 +130,57 @@ public final class SnapshotInfo extends 
 
     /** @return true if the snapshot is corrupted */
     public boolean isSnapshotCorrupted() {
-      return hfilesMissing > 0 || logsMissing > 0;
+      return hfilesMissing.get() > 0 || logsMissing.get() > 0;
     }
 
     /** @return the number of available store files */
     public int getStoreFilesCount() {
-      return hfilesCount + hfileArchiveCount;
+      return hfilesCount.get() + hfileArchiveCount.get();
     }
 
     /** @return the number of available store files in the archive */
     public int getArchivedStoreFilesCount() {
-      return hfileArchiveCount;
+      return hfileArchiveCount.get();
     }
 
     /** @return the number of available log files */
     public int getLogsCount() {
-      return logsCount;
+      return logsCount.get();
     }
 
     /** @return the number of missing store files */
     public int getMissingStoreFilesCount() {
-      return hfilesMissing;
+      return hfilesMissing.get();
     }
 
     /** @return the number of missing log files */
     public int getMissingLogsCount() {
-      return logsMissing;
+      return logsMissing.get();
     }
 
     /** @return the total size of the store files referenced by the snapshot */
     public long getStoreFilesSize() {
-      return hfileSize + hfileArchiveSize;
+      return hfileSize.get() + hfileArchiveSize.get();
     }
 
     /** @return the total size of the store files shared */
     public long getSharedStoreFilesSize() {
-      return hfileSize;
+      return hfileSize.get();
     }
 
     /** @return the total size of the store files in the archive */
     public long getArchivedStoreFileSize() {
-      return hfileArchiveSize;
+      return hfileArchiveSize.get();
     }
 
     /** @return the percentage of the shared store files */
     public float getSharedStoreFilePercentage() {
-      return ((float)hfileSize / (hfileSize + hfileArchiveSize)) * 100;
+      return ((float)hfileSize.get() / (hfileSize.get() + hfileArchiveSize.get())) * 100;
     }
 
     /** @return the total log size */
     public long getLogsSize() {
-      return logSize;
+      return logSize.get();
     }
 
     /**
@@ -197,15 +199,15 @@ public final class SnapshotInfo extends 
       try {
         if ((inArchive = fs.exists(link.getArchivePath()))) {
           size = fs.getFileStatus(link.getArchivePath()).getLen();
-          hfileArchiveSize += size;
-          hfileArchiveCount++;
+          hfileArchiveSize.addAndGet(size);
+          hfileArchiveCount.incrementAndGet();
         } else {
           size = link.getFileStatus(fs).getLen();
-          hfileSize += size;
-          hfilesCount++;
+          hfileSize.addAndGet(size);
+          hfilesCount.incrementAndGet();
         }
       } catch (FileNotFoundException e) {
-        hfilesMissing++;
+        hfilesMissing.incrementAndGet();
       }
       return new FileInfo(inArchive, size);
     }
@@ -221,10 +223,10 @@ public final class SnapshotInfo extends 
       long size = -1;
       try {
         size = logLink.getFileStatus(fs).getLen();
-        logSize += size;
-        logsCount++;
+        logSize.addAndGet(size);
+        logsCount.incrementAndGet();
       } catch (FileNotFoundException e) {
-        logsMissing++;
+        logsMissing.incrementAndGet();
       }
       return new FileInfo(false, size);
     }
@@ -368,8 +370,8 @@ public final class SnapshotInfo extends 
     final SnapshotDescription snapshotDesc = snapshotManifest.getSnapshotDescription();
     final String table = snapshotDesc.getTable();
     final SnapshotStats stats = new SnapshotStats(this.getConf(), this.fs, snapshotDesc);
-    SnapshotReferenceUtil.visitReferencedFiles(getConf(), fs,
-      snapshotManifest.getSnapshotDir(), snapshotDesc, new SnapshotReferenceUtil.SnapshotVisitor() {
+    SnapshotReferenceUtil.concurrentVisitReferencedFiles(getConf(), fs, snapshotManifest,
+      new SnapshotReferenceUtil.SnapshotVisitor() {
         @Override
         public void storeFile(final HRegionInfo regionInfo, final String family,
             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
@@ -448,8 +450,9 @@ public final class SnapshotInfo extends 
     Path rootDir = FSUtils.getRootDir(conf);
     FileSystem fs = FileSystem.get(rootDir.toUri(), conf);
     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshot);
     final SnapshotStats stats = new SnapshotStats(conf, fs, snapshot);
-    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshot,
+    SnapshotReferenceUtil.concurrentVisitReferencedFiles(conf, fs, manifest,
       new SnapshotReferenceUtil.SnapshotVisitor() {
         @Override
         public void storeFile(final HRegionInfo regionInfo, final String family,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java Tue May 20 09:47:01 2014
@@ -442,8 +442,8 @@ public class SnapshotManifest {
     return createExecutor(conf, name);
   }
 
-  static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
-    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 4);
+  public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
+    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
               Threads.getNamedThreadFactory(name));
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java Tue May 20 09:47:01 2014
@@ -103,14 +103,15 @@ public class SnapshotManifestV2 {
     }
 
     public void storeFile(final SnapshotRegionManifest.Builder region,
-        final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile) {
+        final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile)
+        throws IOException {
       SnapshotRegionManifest.StoreFile.Builder sfManifest =
             SnapshotRegionManifest.StoreFile.newBuilder();
       sfManifest.setName(storeFile.getPath().getName());
       if (storeFile.isReference()) {
         sfManifest.setReference(storeFile.getReference().convert());
       }
-      sfManifest.setFileSize(storeFile.getFileStatus().getLen());
+      sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen());
       family.addStoreFiles(sfManifest.build());
     }
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java Tue May 20 09:47:01 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.snapshot;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.HashSet;
@@ -32,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -107,7 +109,7 @@ public final class SnapshotReferenceUtil
     visitLogFiles(fs, snapshotDir, visitor);
   }
 
-  /**
+  /**©
    * Iterate over the snapshot store files
    *
    * @param conf The current {@link Configuration} instance.
@@ -117,7 +119,7 @@ public final class SnapshotReferenceUtil
    * @param visitor callback object to get the store files
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
+  static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
       final Path snapshotDir, final SnapshotDescription desc, final StoreFileVisitor visitor)
       throws IOException {
     SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, desc);
@@ -139,7 +141,7 @@ public final class SnapshotReferenceUtil
    * @param visitor callback object to get the store files
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
+  static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
       final StoreFileVisitor visitor) throws IOException {
     HRegionInfo regionInfo = HRegionInfo.convert(manifest.getRegionInfo());
     for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
@@ -192,6 +194,19 @@ public final class SnapshotReferenceUtil
       final SnapshotManifest manifest) throws IOException {
     final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
     final Path snapshotDir = manifest.getSnapshotDir();
+    concurrentVisitReferencedFiles(conf, fs, manifest, new StoreFileVisitor() {
+      @Override
+      public void storeFile(final HRegionInfo regionInfo, final String family,
+          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+        verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
+      }
+    });
+  }
+
+  public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
+      final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException {
+    final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
+    final Path snapshotDir = manifest.getSnapshotDir();
 
     List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
     if (regionManifests == null || regionManifests.size() == 0) {
@@ -207,13 +222,7 @@ public final class SnapshotReferenceUtil
         completionService.submit(new Callable<Void>() {
           @Override
           public Void call() throws IOException {
-            visitRegionStoreFiles(regionManifest, new StoreFileVisitor() {
-              @Override
-              public void storeFile(final HRegionInfo regionInfo, final String family,
-                  final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
-                verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
-              }
-            });
+            visitRegionStoreFiles(regionManifest, visitor);
             return null;
           }
         });
@@ -251,19 +260,28 @@ public final class SnapshotReferenceUtil
    * @throws CorruptedSnapshotException if the snapshot is corrupted
    * @throws IOException if an error occurred while scanning the directory
    */
-  public static void verifyStoreFile(final Configuration conf, final FileSystem fs,
+  private static void verifyStoreFile(final Configuration conf, final FileSystem fs,
       final Path snapshotDir, final SnapshotDescription snapshot, final HRegionInfo regionInfo,
       final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+    TableName table = TableName.valueOf(snapshot.getTable());
     String fileName = storeFile.getName();
 
     Path refPath = null;
     if (StoreFileInfo.isReference(fileName)) {
       // If is a reference file check if the parent file is present in the snapshot
-      Path snapshotHFilePath = new Path(new Path(
-          new Path(snapshotDir, regionInfo.getEncodedName()), family), fileName);
-      refPath = StoreFileInfo.getReferredToFile(snapshotHFilePath);
-      if (!fs.exists(refPath)) {
-        throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName, snapshot);
+      refPath = new Path(new Path(regionInfo.getEncodedName(), family), fileName);
+      refPath = StoreFileInfo.getReferredToFile(refPath);
+      String refRegion = refPath.getParent().getParent().getName();
+      refPath = HFileLink.createPath(table, refRegion, family, refPath.getName());
+      if (!new HFileLink(conf, refPath).exists(fs)) {
+        throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName +
+          " path=" + refPath, snapshot);
+      }
+
+      if (storeFile.hasReference()) {
+        // We don't really need to look for the file on-disk
+        // we already have the Reference information embedded here.
+        return;
       }
     }
 
@@ -274,15 +292,25 @@ public final class SnapshotReferenceUtil
       linkPath = new Path(family, fileName);
     } else {
       linkPath = new Path(family, HFileLink.createHFileLinkName(
-        TableName.valueOf(snapshot.getTable()), regionInfo.getEncodedName(), fileName));
+        table, regionInfo.getEncodedName(), fileName));
     }
 
     // check if the linked file exists (in the archive, or in the table dir)
     HFileLink link = new HFileLink(conf, linkPath);
-    if (!link.exists(fs)) {
-      throw new CorruptedSnapshotException("Can't find hfile: " + fileName
-          + " in the real (" + link.getOriginPath() + ") or archive (" + link.getArchivePath()
-          + ") directory for the primary table.", snapshot);
+    try {
+      FileStatus fstat = link.getFileStatus(fs);
+      if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {
+        String msg = "hfile: " + fileName + " size does not match with the expected one. " +
+          " found=" + fstat.getLen() + " expected=" + storeFile.getFileSize();
+        LOG.error(msg);
+        throw new CorruptedSnapshotException(msg, snapshot);
+      }
+    } catch (FileNotFoundException e) {
+      String msg = "Can't find hfile: " + fileName + " in the real (" +
+          link.getOriginPath() + ") or archive (" + link.getArchivePath()
+          + ") directory for the primary table.";
+      LOG.error(msg);
+      throw new CorruptedSnapshotException(msg, snapshot);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java Tue May 20 09:47:01 2014
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -56,6 +57,10 @@ public abstract class ModifyRegionUtils 
     void fillRegion(final HRegion region) throws IOException;
   }
 
+  public interface RegionEditTask {
+    void editRegion(final HRegionInfo region) throws IOException;
+  }
+
   /**
    * Create new set of regions on the specified file-system.
    * NOTE: that you should add the regions to hbase:meta after this operation.
@@ -107,10 +112,36 @@ public abstract class ModifyRegionUtils 
       final RegionFillTask task) throws IOException {
     if (newRegions == null) return null;
     int regionNumber = newRegions.length;
-    ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
+    ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
         "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
-    CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
-        regionOpenAndInitThreadPool);
+    try {
+      return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
+    } finally {
+      exec.shutdownNow();
+    }
+  }
+
+  /**
+   * Create new set of regions on the specified file-system.
+   * NOTE: that you should add the regions to hbase:meta after this operation.
+   *
+   * @param exec Thread Pool Executor
+   * @param conf {@link Configuration}
+   * @param rootDir Root directory for HBase instance
+   * @param tableDir table directory
+   * @param hTableDescriptor description of the table
+   * @param newRegions {@link HRegionInfo} that describes the regions to create
+   * @param task {@link RegionFillTask} custom code to populate region after creation
+   * @throws IOException
+   */
+  public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
+      final Configuration conf, final Path rootDir, final Path tableDir,
+      final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+      final RegionFillTask task) throws IOException {
+    if (newRegions == null) return null;
+    int regionNumber = newRegions.length;
+    CompletionService<HRegionInfo> completionService =
+      new ExecutorCompletionService<HRegionInfo>(exec);
     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
     for (final HRegionInfo newRegion : newRegions) {
       completionService.submit(new Callable<HRegionInfo>() {
@@ -132,8 +163,6 @@ public abstract class ModifyRegionUtils 
       throw new InterruptedIOException(e.getMessage());
     } catch (ExecutionException e) {
       throw new IOException(e);
-    } finally {
-      regionOpenAndInitThreadPool.shutdownNow();
     }
     return regionInfos;
   }
@@ -167,6 +196,41 @@ public abstract class ModifyRegionUtils 
     return region.getRegionInfo();
   }
 
+  /**
+   * Execute the task on the specified set of regions.
+   *
+   * @param exec Thread Pool Executor
+   * @param regions {@link HRegionInfo} that describes the regions to edit
+   * @param task {@link RegionFillTask} custom code to edit the region
+   * @throws IOException
+   */
+  public static void editRegions(final ThreadPoolExecutor exec,
+      final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
+    final ExecutorCompletionService<Void> completionService =
+      new ExecutorCompletionService<Void>(exec);
+    for (final HRegionInfo hri: regions) {
+      completionService.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          task.editRegion(hri);
+          return null;
+        }
+      });
+    }
+
+    try {
+      for (HRegionInfo hri: regions) {
+        completionService.take().get();
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      IOException ex = new IOException();
+      ex.initCause(e.getCause());
+      throw ex;
+    }
+  }
+
   /*
    * used by createRegions() to get the thread pool executor based on the
    * "hbase.hregion.open.and.init.threads.max" property.