You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/01/23 19:10:52 UTC

svn commit: r1560774 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java

Author: jmhsieh
Date: Thu Jan 23 18:10:52 2014
New Revision: 1560774

URL: http://svn.apache.org/r1560774
Log:
HBASE-10401 [hbck] perform overlap group merges in parallel

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1560774&r1=1560773&r2=1560774&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Thu Jan 23 18:10:52 2014
@@ -183,7 +183,8 @@ public class HBaseFsck extends Configure
   private HConnection connection;
   private HBaseAdmin admin;
   private HTable meta;
-  protected ExecutorService executor; // threads to retrieve data from regionservers
+  // threads to do ||izable tasks: retrieve data from regionservers, handle overlapping regions
+  protected ExecutorService executor;
   private long startMillis = System.currentTimeMillis();
   private HFileCorruptionChecker hfcc;
   private int retcode = 0;
@@ -2006,8 +2007,8 @@ public class HBaseFsck extends Configure
    */
   public int mergeRegionDirs(Path targetRegionDir, HbckInfo contained) throws IOException {
     int fileMoves = 0;
-
-    LOG.debug("Contained region dir after close and pause");
+    String thread = Thread.currentThread().getName();
+    LOG.debug("[" + thread + "] Contained region dir after close and pause");
     debugLsr(contained.getHdfsRegionDir());
 
     // rename the contained into the container.
@@ -2019,8 +2020,8 @@ public class HBaseFsck extends Configure
       // region we are attempting to merge in is not present!  Since this is a merge, there is
       // no harm skipping this region if it does not exist.
       if (!fs.exists(contained.getHdfsRegionDir())) {
-        LOG.warn("HDFS region dir " + contained.getHdfsRegionDir() + " is missing. " +
-            "Assuming already sidelined or moved.");
+        LOG.warn("[" + thread + "] HDFS region dir " + contained.getHdfsRegionDir() 
+            + " is missing. Assuming already sidelined or moved.");
       } else {
         sidelineRegionDir(fs, contained);
       }
@@ -2029,7 +2030,8 @@ public class HBaseFsck extends Configure
 
     if (dirs == null) {
       if (!fs.exists(contained.getHdfsRegionDir())) {
-        LOG.warn("HDFS region dir " + contained.getHdfsRegionDir() + " already sidelined.");
+        LOG.warn("[" + thread + "] HDFS region dir " + contained.getHdfsRegionDir() 
+            + " already sidelined.");
       } else {
         sidelineRegionDir(fs, contained);
       }
@@ -2050,7 +2052,7 @@ public class HBaseFsck extends Configure
         continue;
       }
 
-      LOG.info("Moving files from " + src + " into containing region " + dst);
+      LOG.info("[" + thread + "] Moving files from " + src + " into containing region " + dst);
       // FileSystem.rename is inconsistent with directories -- if the
       // dst (foo/a) exists and is a dir, and the src (foo/b) is a dir,
       // it moves the src into the dst dir resulting in (foo/a/b).  If
@@ -2061,19 +2063,37 @@ public class HBaseFsck extends Configure
           fileMoves++;
         }
       }
-      LOG.debug("Sideline directory contents:");
+      LOG.debug("[" + thread + "] Sideline directory contents:");
       debugLsr(targetRegionDir);
     }
 
     // if all success.
     sidelineRegionDir(fs, contained);
-    LOG.info("Sidelined region dir "+ contained.getHdfsRegionDir() + " into " +
+    LOG.info("[" + thread + "] Sidelined region dir "+ contained.getHdfsRegionDir() + " into " +
         getSidelineDir());
     debugLsr(contained.getHdfsRegionDir());
 
     return fileMoves;
   }
 
+
+  static class WorkItemOverlapMerge implements Callable<Void> {
+    private TableIntegrityErrorHandler handler;
+    Collection<HbckInfo> overlapgroup;
+    
+    WorkItemOverlapMerge(Collection<HbckInfo> overlapgroup, TableIntegrityErrorHandler handler) {
+      this.handler = handler;
+      this.overlapgroup = overlapgroup;
+    }
+    
+    @Override
+    public Void call() throws Exception {
+      handler.handleOverlapGroup(overlapgroup);
+      return null;
+    }
+  };
+  
+  
   /**
    * Maintain information about a particular table.
    */
@@ -2302,6 +2322,8 @@ public class HBaseFsck extends Configure
        * Cases:
        * - Clean regions that overlap
        * - Only .oldlogs regions (can't find start/stop range, or figure out)
+       * 
+       * This is basically threadsafe, except for the fixer increment in mergeOverlaps.
        */
       @Override
       public void handleOverlapGroup(Collection<HbckInfo> overlap)
@@ -2329,7 +2351,8 @@ public class HBaseFsck extends Configure
 
       void mergeOverlaps(Collection<HbckInfo> overlap)
           throws IOException {
-        LOG.info("== Merging regions into one region: "
+        String thread = Thread.currentThread().getName();
+        LOG.info("== [" + thread + "] Merging regions into one region: "
           + Joiner.on(",").join(overlap));
         // get the min / max range and close all concerned regions
         Pair<byte[], byte[]> range = null;
@@ -2347,25 +2370,25 @@ public class HBaseFsck extends Configure
             }
           }
           // need to close files so delete can happen.
-          LOG.debug("Closing region before moving data around: " +  hi);
-          LOG.debug("Contained region dir before close");
+          LOG.debug("[" + thread + "] Closing region before moving data around: " +  hi);
+          LOG.debug("[" + thread + "] Contained region dir before close");
           debugLsr(hi.getHdfsRegionDir());
           try {
-            LOG.info("Closing region: " + hi);
+            LOG.info("[" + thread + "] Closing region: " + hi);
             closeRegion(hi);
           } catch (IOException ioe) {
-            LOG.warn("Was unable to close region " + hi
+            LOG.warn("[" + thread + "] Was unable to close region " + hi
               + ".  Just continuing... ", ioe);
           } catch (InterruptedException e) {
-            LOG.warn("Was unable to close region " + hi
+            LOG.warn("[" + thread + "] Was unable to close region " + hi
               + ".  Just continuing... ", e);
           }
 
           try {
-            LOG.info("Offlining region: " + hi);
+            LOG.info("[" + thread + "] Offlining region: " + hi);
             offline(hi.getRegionName());
           } catch (IOException ioe) {
-            LOG.warn("Unable to offline region from master: " + hi
+            LOG.warn("[" + thread + "] Unable to offline region from master: " + hi
               + ".  Just continuing... ", ioe);
           }
         }
@@ -2376,7 +2399,7 @@ public class HBaseFsck extends Configure
         HRegionInfo newRegion = new HRegionInfo(htd.getTableName(), range.getFirst(),
             range.getSecond());
         HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, newRegion, htd);
-        LOG.info("Created new empty container region: " +
+        LOG.info("[" + thread + "] Created new empty container region: " +
             newRegion + " to contain regions: " + Joiner.on(",").join(overlap));
         debugLsr(region.getRegionFileSystem().getRegionDir());
 
@@ -2384,7 +2407,7 @@ public class HBaseFsck extends Configure
         boolean didFix= false;
         Path target = region.getRegionFileSystem().getRegionDir();
         for (HbckInfo contained : overlap) {
-          LOG.info("Merging " + contained  + " into " + target );
+          LOG.info("[" + thread + "] Merging " + contained  + " into " + target );
           int merges = mergeRegionDirs(target, contained);
           if (merges > 0) {
             didFix = true;
@@ -2540,8 +2563,20 @@ public class HBaseFsck extends Configure
         handler.handleRegionEndKeyNotEmpty(prevKey);
       }
 
-      for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
-        handler.handleOverlapGroup(overlap);
+      // TODO fold this into the TableIntegrityHandler
+      if (getConf().getBoolean("hbasefsck.overlap.merge.parallel", true)) {
+        LOG.info("Handling overlap merges in parallel. set hbasefsck.overlap.merge.parallel to" +
+            " false to run serially.");
+        boolean ok = handleOverlapsParallel(handler, prevKey);
+        if (!ok) {
+          return false;
+        }
+      } else {
+        LOG.info("Handling overlap merges serially.  set hbasefsck.overlap.merge.parallel to" +
+            " true to run in parallel.");
+        for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
+          handler.handleOverlapGroup(overlap);
+        }
       }
 
       if (details) {
@@ -2565,6 +2600,38 @@ public class HBaseFsck extends Configure
       return errors.getErrorList().size() == originalErrorsCount;
     }
 
+    private boolean handleOverlapsParallel(TableIntegrityErrorHandler handler, byte[] prevKey)
+        throws IOException {
+      // we parallelize overlap handler for the case we have lots of groups to fix.  We can
+      // safely assume each group is independent. 
+      List<WorkItemOverlapMerge> merges = new ArrayList<WorkItemOverlapMerge>(overlapGroups.size());
+      List<Future<Void>> rets;
+      for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
+        // 
+        merges.add(new WorkItemOverlapMerge(overlap, handler));
+      }
+      try {
+        rets = executor.invokeAll(merges);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        LOG.error("Overlap merges were interrupted", e);
+        return false;
+      }
+      for(int i=0; i<merges.size(); i++) {
+        WorkItemOverlapMerge work = merges.get(i);
+        Future<Void> f = rets.get(i);
+        try {
+          f.get();
+        } catch(ExecutionException e) {
+          LOG.warn("Failed to merge overlap group" + work, e.getCause());
+        } catch (InterruptedException e) {
+          LOG.error("Waiting for overlap merges was interrupted", e);
+          return false;
+        }
+      }
+      return true;
+    }
+
     /**
      * This dumps data in a visually reasonable way for visual debugging
      *