You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/07/14 00:41:22 UTC
svn commit: r1361429 -
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Author: mbautin
Date: Fri Jul 13 22:41:22 2012
New Revision: 1361429
URL: http://svn.apache.org/viewvc?rev=1361429&view=rev
Log:
[HBASE-6388] [89-fb] parallelize close and avoid deleting HLog, unless successful.
Author: aaiyer
Summary:
parallelizing close to reduce the restart time
Ensure that we (i) delete HLogs and (ii) inform master only if all the regions were successfully closed. If a region was unable to flush, deleting HLogs might have resulted in a data loss.
Test Plan: deploy to dev cluster and check the times taken to close regions
Reviewers: pkhemani
Reviewed By: pkhemani
Differential Revision: https://phabricator.fb.com/D511446
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1361429&r1=1361428&r2=1361429&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Jul 13 22:41:22 2012
@@ -46,10 +46,15 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -781,35 +786,45 @@ public class HRegionServer implements HR
}
LOG.info("aborting server at: " + this.serverInfo.getServerName());
} else {
- ArrayList<HRegion> closedRegions = closeAllRegions();
- try {
- if (this.hlog != null) {
- hlog.closeAndDelete();
- }
- } catch (Throwable e) {
- LOG.error("Close and delete failed",
- RemoteExceptionHandler.checkThrowable(e));
- }
- try {
- HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
- if (restartRequested) {
- exitMsg[0] = REPORT_RESTARTING;
- } else {
- exitMsg[0] = REPORT_EXITING;
- }
- // Tell the master what regions we are/were serving
- int i = 1;
- for (HRegion region: closedRegions) {
- exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
- region.getRegionInfo());
+ Collection<HRegion> regionsToClose = this.onlineRegions.values();
+ ArrayList<HRegion> regionsClosed = closeAllRegions();
+ if (regionsToClose.size() == regionsClosed.size()) {
+ try {
+ if (this.hlog != null) {
+ hlog.closeAndDelete();
+ }
+ } catch (Throwable e) {
+ LOG.error("Close and delete failed",
+ RemoteExceptionHandler.checkThrowable(e));
}
+ try {
+ HMsg[] exitMsg = new HMsg[regionsClosed.size() + 1];
+ if (restartRequested) {
+ exitMsg[0] = REPORT_RESTARTING;
+ } else {
+ exitMsg[0] = REPORT_EXITING;
+ }
+ // Tell the master what regions we are/were serving
+ int i = 1;
+ for (HRegion region: regionsClosed) {
+ exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
+ region.getRegionInfo());
+ }
- LOG.info("telling master that region server is shutting down at: " +
- serverInfo.getServerName());
- hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
- } catch (Throwable e) {
- LOG.warn("Failed to send exiting message to master: ",
- RemoteExceptionHandler.checkThrowable(e));
+ LOG.info("telling master that region server is shutting down at: " +
+ serverInfo.getServerName());
+ hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
+ } catch (Throwable e) {
+ LOG.warn("Failed to send exiting message to master: ",
+ RemoteExceptionHandler.checkThrowable(e));
+ }
+ }
+ else {
+ // if we don't inform the master, then the master is going to detect the expired
+ // znode and cause log splitting. We need this for the region that we failed to
+ // close (in case there were unflushed edits).
+ LOG.info("Failed to close all regions"
+ + " -- skip informing master that we are shutting down ");
}
LOG.info("stopping server at: " + this.serverInfo.getServerName());
}
@@ -2020,7 +2035,8 @@ public class HRegionServer implements HR
}
}
- /** Called either when the master tells us to restart or from stop() */
+ /** Called either when the master tells us to restart or from stop()
+ * @throws Throwable */
ArrayList<HRegion> closeAllRegions() {
ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
this.lock.writeLock().lock();
@@ -2030,26 +2046,62 @@ public class HRegionServer implements HR
} finally {
this.lock.writeLock().unlock();
}
- // Close any outstanding scanners. Means they'll get an UnknownScanner
+
+ // First, close any outstanding scanners. Means they'll get an UnknownScanner
// exception next time they come in.
for (Map.Entry<String, InternalScanner> e: this.scanners.entrySet()) {
try {
e.getValue().close();
- } catch (IOException ioe) {
- LOG.warn("Closing scanner " + e.getKey(), ioe);
+ } catch (Exception ioe) {
+ LOG.warn("Closing scanner " , ioe);
}
}
- for (HRegion region: regionsToClose) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
- }
+
+ // Then, we close the regions
+ ExecutorService closingPoolExecutor =
+ new ThreadPoolExecutor(1, Integer.MAX_VALUE,
+ 60, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new DaemonThreadFactory("regionserver-closing-"));
+
+ List<Future<Object>> futures =
+ new ArrayList<Future<Object>>(regionsToClose.size());
+
+ for (int i = 0; i < regionsToClose.size(); i++ ) {
+ futures.add(closingPoolExecutor.submit(createRegionCloseCallable(regionsToClose.get(i))));
+ }
+
+ ArrayList<HRegion> regionsClosed = new ArrayList<HRegion>();
+ for (int i = 0; i < futures.size(); i++ ) {
+ Future<Object> future = futures.get(i);
try {
- region.close(abortRequested);
- } catch (Throwable e) {
- cleanup(e, "Error closing " + Bytes.toString(region.getRegionName()));
+ future.get();
+ // add to regionsClosed only if we don't see an exception.
+ regionsClosed.add(regionsToClose.get(i));
+ } catch (Throwable e1) {
+ if (e1 instanceof ExecutionException) e1 = e1.getCause();
+ LOG.error("Error closingRegion " + regionsToClose.get(i), e1);
}
}
- return regionsToClose;
+
+ return regionsClosed;
+ }
+
+ private Callable<Object> createRegionCloseCallable(final HRegion region) {
+ return new Callable<Object>() {
+ public Object call() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
+ }
+ try {
+ region.close(abortRequested);
+ } catch (IOException e) {
+ cleanup(e, "Error closing " + Bytes.toString(region.getRegionName()));
+ throw e;
+ }
+ return null;
+ }
+ };
}
/*