You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/07/14 00:41:22 UTC

svn commit: r1361429 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Author: mbautin
Date: Fri Jul 13 22:41:22 2012
New Revision: 1361429

URL: http://svn.apache.org/viewvc?rev=1361429&view=rev
Log:
[HBASE-6388] [89-fb] parallelize close and  avoid deleting HLog, unless successful.

Author: aaiyer

Summary:
parallelizing close to reduce the restart time

Ensure that we (i) delete HLogs and (ii) inform master only if all the regions were successfully closed. If a region was unable to flush, deleting HLogs might have resulted in a data loss.

Test Plan: deploy to dev cluster and check the times taken to close regions

Reviewers: pkhemani

Reviewed By: pkhemani

Differential Revision: https://phabricator.fb.com/D511446

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1361429&r1=1361428&r2=1361429&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Jul 13 22:41:22 2012
@@ -46,10 +46,15 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -781,35 +786,45 @@ public class HRegionServer implements HR
       }
       LOG.info("aborting server at: " + this.serverInfo.getServerName());
     } else {
-      ArrayList<HRegion> closedRegions = closeAllRegions();
-      try {
-        if (this.hlog != null) {
-          hlog.closeAndDelete();
-        }
-      } catch (Throwable e) {
-        LOG.error("Close and delete failed",
-          RemoteExceptionHandler.checkThrowable(e));
-      }
-      try {
-        HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
-        if (restartRequested) {
-          exitMsg[0] = REPORT_RESTARTING;
-        } else {
-          exitMsg[0] = REPORT_EXITING;
-        }
-        // Tell the master what regions we are/were serving
-        int i = 1;
-        for (HRegion region: closedRegions) {
-          exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
-              region.getRegionInfo());
+      Collection<HRegion> regionsToClose = this.onlineRegions.values();
+      ArrayList<HRegion> regionsClosed = closeAllRegions();
+      if (regionsToClose.size() == regionsClosed.size()) {
+        try {
+          if (this.hlog != null) {
+            hlog.closeAndDelete();
+          }
+        } catch (Throwable e) {
+          LOG.error("Close and delete failed",
+            RemoteExceptionHandler.checkThrowable(e));
         }
+        try {
+          HMsg[] exitMsg = new HMsg[regionsClosed.size() + 1];
+          if (restartRequested) {
+            exitMsg[0] = REPORT_RESTARTING;
+          } else {
+            exitMsg[0] = REPORT_EXITING;
+          }
+          // Tell the master what regions we are/were serving
+          int i = 1;
+          for (HRegion region: regionsClosed) {
+            exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
+                region.getRegionInfo());
+          }
 
-        LOG.info("telling master that region server is shutting down at: " +
-            serverInfo.getServerName());
-        hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
-      } catch (Throwable e) {
-        LOG.warn("Failed to send exiting message to master: ",
-          RemoteExceptionHandler.checkThrowable(e));
+          LOG.info("telling master that region server is shutting down at: " +
+              serverInfo.getServerName());
+          hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
+        } catch (Throwable e) {
+          LOG.warn("Failed to send exiting message to master: ",
+            RemoteExceptionHandler.checkThrowable(e));
+        }
+      }
+      else {
+        // if we don't inform the master, then the master is going to detect the expired
+        // znode and cause log splitting. We need this for the region that we failed to 
+        // close (in case there were unflushed edits).
+          LOG.info("Failed to close all regions"
+              + " -- skip informing master that we are shutting down ");
       }
       LOG.info("stopping server at: " + this.serverInfo.getServerName());
     }
@@ -2020,7 +2035,8 @@ public class HRegionServer implements HR
     }
   }
 
-  /** Called either when the master tells us to restart or from stop() */
+  /** Called either when the master tells us to restart or from stop()
+   * @throws Throwable */
   ArrayList<HRegion> closeAllRegions() {
     ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
     this.lock.writeLock().lock();
@@ -2030,26 +2046,62 @@ public class HRegionServer implements HR
     } finally {
       this.lock.writeLock().unlock();
     }
-    // Close any outstanding scanners.  Means they'll get an UnknownScanner
+
+    // First, close any outstanding scanners.  Means they'll get an UnknownScanner
     // exception next time they come in.
     for (Map.Entry<String, InternalScanner> e: this.scanners.entrySet()) {
       try {
         e.getValue().close();
-      } catch (IOException ioe) {
-        LOG.warn("Closing scanner " + e.getKey(), ioe);
+      } catch (Exception ioe) {
+        LOG.warn("Closing scanner " , ioe);
       }
     }
-    for (HRegion region: regionsToClose) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
-      }
+
+    // Then, we close the regions
+    ExecutorService closingPoolExecutor =
+      new ThreadPoolExecutor(1, Integer.MAX_VALUE,
+        60, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        new DaemonThreadFactory("regionserver-closing-"));
+
+    List<Future<Object>> futures =
+        new ArrayList<Future<Object>>(regionsToClose.size());
+
+    for (int i = 0; i < regionsToClose.size(); i++ ) {
+      futures.add(closingPoolExecutor.submit(createRegionCloseCallable(regionsToClose.get(i))));
+    }
+
+    ArrayList<HRegion> regionsClosed = new ArrayList<HRegion>();
+    for (int i = 0; i < futures.size(); i++ ) {
+      Future<Object> future = futures.get(i);
       try {
-        region.close(abortRequested);
-      } catch (Throwable e) {
-        cleanup(e, "Error closing " + Bytes.toString(region.getRegionName()));
+        future.get();
+        // add to regionsClosed only if we don't see an exception.
+        regionsClosed.add(regionsToClose.get(i));
+      } catch (Throwable e1) {
+        if (e1 instanceof ExecutionException) e1 = e1.getCause();
+        LOG.error("Error closingRegion " + regionsToClose.get(i), e1);
       }
     }
-    return regionsToClose;
+
+    return regionsClosed;
+  }
+
+  private Callable<Object> createRegionCloseCallable(final HRegion region) {
+    return new Callable<Object>() {
+      public Object call() throws IOException {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
+        }
+        try {
+          region.close(abortRequested);
+        } catch (IOException e) {
+          cleanup(e, "Error closing " + Bytes.toString(region.getRegionName()));
+          throw e;
+        }
+        return null;
+      }
+    };
   }
 
   /*