You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/15 17:14:55 UTC
svn commit: r575928 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./
bin/ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Author: stack
Date: Sat Sep 15 08:14:53 2007
New Revision: 575928
URL: http://svn.apache.org/viewvc?rev=575928&view=rev
Log:
HADOOP-1813 OOME makes zombie of region server
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 08:14:53 2007
@@ -38,6 +38,7 @@
(Ning Li via Stack)
HADOOP-1800 output should default utf8 encoding
HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
+ HADOOP-1813 OOME makes zombie of region server
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables
Modified: lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/bin/hbase?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/bin/hbase (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/bin/hbase Sat Sep 15 08:14:53 2007
@@ -206,7 +206,11 @@
CLASS=$COMMAND
fi
-
+# Have JVM dump heap if we run out of memory. Files will be 'launch directory'
+# and are named like the following: java_pid21612.hprof. Apparently it doesn't
+# 'cost' to have this flag enabled. Its a 1.6 flag only. See:
+# http://blogs.sun.com/alanb/entry/outofmemoryerror_looks_a_bit_better
+HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError"
HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
HBASE_OPTS="$HBASE_OPTS -Dhadoop.home.dir=$HADOOP_HOME"
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sat Sep 15 08:14:53 2007
@@ -219,7 +219,6 @@
// cache-flush. Otherwise, the log sequence number for
// the CACHEFLUSH operation will appear in a "newer" log file
// than it should.
-
while(insideCacheFlush) {
try {
wait();
@@ -402,14 +401,14 @@
* @see #completeCacheFlush(Text, Text, long)
*/
synchronized long startCacheFlush() {
- while (insideCacheFlush) {
+ while (this.insideCacheFlush) {
try {
wait();
} catch (InterruptedException ie) {
// continue
}
}
- insideCacheFlush = true;
+ this.insideCacheFlush = true;
notifyAll();
return obtainSeqNum();
}
@@ -427,7 +426,7 @@
return;
}
- if(! insideCacheFlush) {
+ if (!this.insideCacheFlush) {
throw new IOException("Impossible situation: inside " +
"completeCacheFlush(), but 'insideCacheFlush' flag is false");
}
@@ -442,6 +441,16 @@
regionToLastFlush.put(regionName, logSeqId);
insideCacheFlush = false;
+ notifyAll();
+ }
+
+ /**
+ * Abort a cache flush.
+ * This method will clear waits on {@link #insideCacheFlush} but if this
+ * method is called, we are losing data. TODO: Fix.
+ */
+ synchronized void abort() {
+ this.insideCacheFlush = false;
notifyAll();
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Sep 15 08:14:53 2007
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -36,6 +38,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -57,6 +60,8 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
@@ -64,33 +69,33 @@
* HMaster is the "master server" for a HBase.
* There is only one HMaster for a single HBase deployment.
*/
-public class HMaster implements HConstants, HMasterInterface,
-HMasterRegionInterface, Runnable {
+public class HMaster extends Thread implements HConstants, HMasterInterface,
+HMasterRegionInterface {
+ static final Log LOG = LogFactory.getLog(HMaster.class.getName());
- /** {@inheritDoc} */
public long getProtocolVersion(String protocol,
- @SuppressWarnings("unused") long clientVersion) throws IOException {
-
+ @SuppressWarnings("unused") long clientVersion)
+ throws IOException {
if (protocol.equals(HMasterInterface.class.getName())) {
return HMasterInterface.versionID;
-
} else if (protocol.equals(HMasterRegionInterface.class.getName())) {
return HMasterRegionInterface.versionID;
-
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
- static final Log LOG = LogFactory.getLog(HMaster.class.getName());
-
- volatile boolean closed;
+ // We start out with closed flag on. Using AtomicBoolean rather than
+ // plain boolean because want to pass a reference to supporting threads
+ // started here in HMaster rather than have them have to know about the
+ // hosting class
+ volatile AtomicBoolean closed = new AtomicBoolean(true);
volatile boolean fsOk;
Path dir;
Configuration conf;
FileSystem fs;
Random rand;
- long threadWakeFrequency;
+ int threadWakeFrequency;
int numRetries;
long maxRegionOpenTime;
@@ -102,12 +107,15 @@
HConnection connection;
- long metaRescanInterval;
+ int metaRescanInterval;
final AtomicReference<HServerAddress> rootRegionLocation =
new AtomicReference<HServerAddress>();
Lock splitLogLock = new ReentrantLock();
+
+ // A Sleeper that sleeps for threadWakeFrequency
+ protected Sleeper sleeper;
/**
* Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
@@ -156,31 +164,28 @@
* <p>A <code>META</code> region is not 'online' until it has been scanned
* once.
*/
- abstract class BaseScanner implements Runnable {
+ abstract class BaseScanner extends Chore {
protected boolean rootRegion;
protected final Text tableName;
protected abstract void initialScan();
protected abstract void maintenanceScan();
- BaseScanner(final Text tableName) {
- super();
+ BaseScanner(final Text tableName, final int period,
+ final AtomicBoolean stop) {
+ super(period, stop);
this.tableName = tableName;
this.rootRegion = tableName.equals(ROOT_TABLE_NAME);
}
-
- /** {@inheritDoc} */
- public void run() {
+
+ @Override
+ protected void initialChore() {
initialScan();
- while (!closed) {
- try {
- Thread.sleep(metaRescanInterval);
- } catch (InterruptedException e) {
- continue;
- }
- maintenanceScan();
- }
- LOG.info(this.getClass().getSimpleName() + " exiting");
+ }
+
+ @Override
+ protected void chore() {
+ maintenanceScan();
}
/**
@@ -228,7 +233,6 @@
// Note Region has been assigned.
checkAssigned(info, serverName, startCode);
-
if (isSplitParent(info)) {
splitParents.put(info, results);
}
@@ -237,11 +241,9 @@
if (rootRegion) {
numberOfMetaRegions.set(numberOfRegionsFound);
}
-
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
if (e instanceof UnknownScannerException) {
// Reset scannerId so we do not try closing a scanner the other side
// has lost account of: prevents duplicated stack trace out of the
@@ -250,18 +252,14 @@
}
}
throw e;
-
} finally {
try {
if (scannerId != -1L && regionServer != null) {
regionServer.close(scannerId);
}
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- LOG.error("Closing scanner", e);
+ LOG.error("Closing scanner",
+ RemoteExceptionHandler.checkIOException(e));
}
}
@@ -468,18 +466,17 @@
class RootScanner extends BaseScanner {
/** Constructor */
public RootScanner() {
- super(HConstants.ROOT_TABLE_NAME);
+ super(HConstants.ROOT_TABLE_NAME, metaRescanInterval, closed);
}
private void scanRoot() {
int tries = 0;
- while (!closed && tries < numRetries) {
+ while (!closed.get() && tries < numRetries) {
synchronized (rootRegionLocation) {
- while(!closed && rootRegionLocation.get() == null) {
+ while(!closed.get() && rootRegionLocation.get() == null) {
// rootRegionLocation will be filled in when we get an 'open region'
// regionServerReport message from the HRegionServer that has been
// allocated the ROOT region below.
-
try {
rootRegionLocation.wait();
} catch (InterruptedException e) {
@@ -487,7 +484,7 @@
}
}
}
- if (closed) {
+ if (closed.get()) {
continue;
}
@@ -499,24 +496,17 @@
}
break;
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- } catch (IOException ex) {
- e = ex;
- }
- }
+ e = RemoteExceptionHandler.checkIOException(e);
tries += 1;
if (tries == 1) {
LOG.warn("Scan ROOT region", e);
} else {
LOG.error("Scan ROOT region", e);
-
if (tries == numRetries - 1) {
// We ran out of tries. Make sure the file system is still available
-
- checkFileSystem();
+ if (checkFileSystem()) {
+ continue; // Avoid sleeping.
+ }
}
}
} catch (Exception e) {
@@ -524,16 +514,7 @@
// at least log it rather than go out silently.
LOG.error("Unexpected exception", e);
}
-
- if (!closed) {
- // sleep before retry
-
- try {
- Thread.sleep(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue
- }
- }
+ sleeper.sleep();
}
}
@@ -549,8 +530,7 @@
}
}
- private RootScanner rootScanner;
- private Thread rootScannerThread;
+ private RootScanner rootScannerThread;
Integer rootScannerLock = new Integer(0);
@SuppressWarnings("unchecked")
@@ -643,20 +623,17 @@
class MetaScanner extends BaseScanner {
/** Constructor */
public MetaScanner() {
- super(HConstants.META_TABLE_NAME);
+ super(HConstants.META_TABLE_NAME, metaRescanInterval, closed);
}
private void scanOneMetaRegion(MetaRegion region) {
int tries = 0;
- while (!closed && tries < numRetries) {
- while (!closed && !rootScanned && rootRegionLocation.get() == null) {
- try {
- Thread.sleep(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue
- }
+ while (!closed.get() && tries < numRetries) {
+ while (!closed.get() && !rootScanned &&
+ rootRegionLocation.get() == null) {
+ sleeper.sleep();
}
- if (closed) {
+ if (closed.get()) {
continue;
}
@@ -668,24 +645,23 @@
}
break;
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- } catch (IOException ex) {
- e = ex;
- }
- }
+ e = RemoteExceptionHandler.checkIOException(e);
tries += 1;
if (tries == 1) {
LOG.warn("Scan one META region", e);
} else {
LOG.error("Scan one META region", e);
-
if (tries == numRetries - 1) {
- // We ran out of tries. Make sure the file system is still available
-
- checkFileSystem();
+ // We ran out of tries. Make sure the file system is still
+ // available
+ if (checkFileSystem()) {
+ // If filesystem is OK, is the exception a ConnectionException?
+ // If so, mark the server as down. No point scanning either
+ // if no server to put meta region on.
+ if (e instanceof ConnectException) {
+ LOG.debug("Region hosting server is gone.");
+ }
+ }
}
}
} catch (Exception e) {
@@ -693,21 +669,14 @@
// at least log it rather than go out silently.
LOG.error("Unexpected exception", e);
}
- if (!closed) {
- // sleep before retry
- try {
- Thread.sleep(threadWakeFrequency);
- } catch (InterruptedException e) {
- //continue
- }
- }
+ sleeper.sleep();
}
}
@Override
protected void initialScan() {
MetaRegion region = null;
- while (!closed && region == null && !metaRegionsScanned()) {
+ while (!closed.get() && region == null && !metaRegionsScanned()) {
try {
region =
metaRegionsToScan.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
@@ -752,10 +721,9 @@
* been scanned.
*/
synchronized boolean waitForMetaRegionsOrClose() {
- while (!closed) {
+ while (!closed.get()) {
if (rootScanned &&
numberOfMetaRegions.get() == onlineMetaRegions.size()) {
-
break;
}
@@ -765,12 +733,11 @@
// continue
}
}
- return closed;
+ return closed.get();
}
}
- MetaScanner metaScanner;
- private Thread metaScannerThread;
+ MetaScanner metaScannerThread;
Integer metaScannerLock = new Integer(0);
/**
@@ -840,16 +807,14 @@
/**
* Build the HMaster
- * @param dir - base directory
- * @param address - server address and port number
- * @param conf - configuration
+ * @param dir base directory
+ * @param address server address and port number
+ * @param conf configuration
*
* @throws IOException
*/
public HMaster(Path dir, HServerAddress address, Configuration conf)
- throws IOException {
-
- this.closed = true;
+ throws IOException {
this.fsOk = true;
this.dir = dir;
this.conf = conf;
@@ -861,9 +826,7 @@
LOG.info("Root region dir: " + rootRegionDir.toString());
try {
-
// Make sure the root directory exists!
-
if(! fs.exists(dir)) {
fs.mkdirs(dir);
}
@@ -887,9 +850,7 @@
meta.getLog().closeAndDelete();
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- }
+ e = RemoteExceptionHandler.checkIOException(e);
LOG.error("bootstrap", e);
throw e;
}
@@ -900,7 +861,7 @@
throw e;
}
- this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
@@ -908,8 +869,8 @@
this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
this.serverLeases = new Leases(
- conf.getLong("hbase.master.lease.period", 30 * 1000),
- conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+ conf.getInt("hbase.master.lease.period", 30 * 1000),
+ conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
this.server = RPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
@@ -923,13 +884,12 @@
this.connection = HConnectionManager.getConnection(conf);
this.metaRescanInterval =
- conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
+ conf.getInt("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
// The root region
this.rootScanned = false;
- this.rootScanner = new RootScanner();
- this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
+ this.rootScannerThread = new RootScanner();
// Scans the meta table
@@ -941,8 +901,7 @@
this.initialMetaScanComplete = false;
- this.metaScanner = new MetaScanner();
- this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
+ this.metaScannerThread = new MetaScanner();
this.unassignedRegions =
Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
@@ -973,8 +932,10 @@
this.loadToServers = new TreeMap<HServerLoad, Set<String>>();
this.serversToLoad = new HashMap<String, HServerLoad>();
+ this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
+
// We're almost open for business
- this.closed = false;
+ this.closed.set(false);
LOG.info("HMaster initialized on " + this.address.toString());
}
@@ -988,7 +949,7 @@
if (fsOk) {
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down HBase cluster: file system not available");
- closed = true;
+ closed.set(true);
fsOk = false;
}
}
@@ -1002,108 +963,76 @@
/** Main processing loop */
public void run() {
- Thread.currentThread().setName("HMaster");
- try {
- // Start things up
- this.serverLeases.start();
- this.rootScannerThread.start();
- this.metaScannerThread.start();
-
- // Start the server last so everything else is running before we start
- // receiving requests
-
- this.server.start();
-
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
- } catch (IOException ex) {
- LOG.warn("thread start", ex);
- }
- }
-
- // Something happened during startup. Shut things down.
-
- this.closed = true;
- LOG.error("Failed startup", e);
- }
-
+ final String threadName = "HMaster";
+ Thread.currentThread().setName(threadName);
+ startAllServices();
/*
* Main processing loop
*/
-
- for (PendingOperation op = null; !closed; ) {
- try {
- op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-
- } catch (InterruptedException e) {
- // continue
- }
- if (op == null || closed) {
- continue;
- }
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Main processing loop: " + op.toString());
+ try {
+ for (PendingOperation op = null; !closed.get(); ) {
+ try {
+ op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // continue
}
-
- if (!op.process()) {
- // Operation would have blocked because not all meta regions are
- // online. This could cause a deadlock, because this thread is waiting
- // for the missing meta region(s) to come back online, but since it
- // is waiting, it cannot process the meta region online operation it
- // is waiting for. So put this operation back on the queue for now.
-
- if (msgQueue.size() == 0) {
- // The queue is currently empty so wait for a while to see if what
- // we need comes in first
+ if (op == null || closed.get()) {
+ continue;
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Main processing loop: " + op.toString());
+ }
+ if (!op.process()) {
+ // Operation would have blocked because not all meta regions are
+ // online. This could cause a deadlock, because this thread is waiting
+ // for the missing meta region(s) to come back online, but since it
+ // is waiting, it cannot process the meta region online operation it
+ // is waiting for. So put this operation back on the queue for now.
+ if (msgQueue.size() == 0) {
+ // The queue is currently empty so wait for a while to see if what
+ // we need comes in first
+ sleeper.sleep();
+ }
try {
- Thread.sleep(threadWakeFrequency);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Put " + op.toString() + " back on queue");
+ }
+ msgQueue.put(op);
} catch (InterruptedException e) {
- // continue
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
}
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Put " + op.toString() + " back on queue");
+ } catch (Exception ex) {
+ if (ex instanceof RemoteException) {
+ try {
+ ex = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException)ex);
+ } catch (IOException e) {
+ ex = e;
+ LOG.warn("main processing loop: " + op.toString(), e);
}
+ }
+ if (!checkFileSystem()) {
+ break;
+ }
+ LOG.warn("Processing pending operations: " + op.toString(), ex);
+ try {
msgQueue.put(op);
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
}
-
- } catch (Exception ex) {
- if (ex instanceof RemoteException) {
- try {
- ex = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) ex);
-
- } catch (IOException e) {
- ex = e;
- LOG.warn("main processing loop: " + op.toString(), e);
- }
- }
- if (!checkFileSystem()) {
- break;
- }
- LOG.warn("Processing pending operations: " + op.toString(), ex);
- try {
- msgQueue.put(op);
- } catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
- }
}
+ } catch (Throwable t) {
+ LOG.fatal("Unhandled exception", t);
}
letRegionServersShutdown();
/*
* Clean up and close up shop
*/
-
synchronized(rootScannerLock) {
rootScannerThread.interrupt(); // Wake root scanner
}
@@ -1136,6 +1065,40 @@
LOG.info("HMaster main thread exiting");
}
+
+ /*
+ * Start up all services. If any of these threads gets an unhandled exception
+ * then they just die with a logged message. This should be fine because
+ * in general, we do not expect the master to get such unhandled exceptions
+ * as OOMEs; it should be lightly loaded. See what HRegionServer does if
+ * need to install an unexpected exception handler.
+ */
+ private void startAllServices() {
+ String threadName = Thread.currentThread().getName();
+ try {
+ Threads.setDaemonThreadRunning(this.rootScannerThread,
+ threadName + ".rootScanner");
+ Threads.setDaemonThreadRunning(this.metaScannerThread,
+ threadName + ".metaScanner");
+ // Leases are not the same as Chore threads. Set name differently.
+ this.serverLeases.setName(threadName + ".leaseChecker");
+ this.serverLeases.start();
+ // Start the server last so everything else is running before we start
+ // receiving requests.
+ this.server.start();
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ try {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ } catch (IOException ex) {
+ LOG.warn("thread start", ex);
+ }
+ }
+ // Something happened during startup. Shut things down.
+ this.closed.set(true);
+ LOG.error("Failed startup", e);
+ }
+ }
/*
* Wait on regionservers to report in
@@ -1192,7 +1155,7 @@
}
serversToServerInfo.notifyAll();
}
- if (storedInfo != null && !closed) {
+ if (storedInfo != null && !closed.get()) {
try {
msgQueue.put(new PendingServerShutdown(storedInfo));
} catch (InterruptedException e) {
@@ -1215,7 +1178,7 @@
loadToServers.put(load, servers);
}
- if (!closed) {
+ if (!closed.get()) {
long serverLabel = getServerLabel(s);
serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
}
@@ -1247,7 +1210,7 @@
// Get all the regions the server was serving reassigned
// (if we are not shutting down).
- if (!closed) {
+ if (!closed.get()) {
for (int i = 1; i < msgs.length; i++) {
HRegionInfo info = msgs[i].getRegionInfo();
@@ -1269,7 +1232,7 @@
return new HMsg[0];
}
- if (closed) {
+ if (closed.get()) {
// Tell server to shut down if we are shutting down. This should
// happen after check of MSG_REPORT_EXITING above, since region server
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP
@@ -1361,13 +1324,11 @@
if (info != null) {
// Only cancel lease and update load information once.
// This method can be called a couple of times during shutdown.
-
LOG.info("Cancelling lease for " + serverName);
serverLeases.cancelLease(serverLabel, serverLabel);
leaseCancelled = true;
// update load information
-
HServerLoad load = serversToLoad.remove(serverName);
if (load != null) {
Set<String> servers = loadToServers.get(load);
@@ -1763,17 +1724,11 @@
try {
while (true) {
MapWritable values = null;
-
try {
values = server.next(scannerId);
-
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
-
- }
- LOG.error("Shutdown scanning of meta region", e);
+ LOG.error("Shutdown scanning of meta region",
+ RemoteExceptionHandler.checkIOException(e));
break;
}
@@ -1805,20 +1760,16 @@
// Check server name. If null, be conservative and treat as though
// region had been on shutdown server (could be null because we
// missed edits in hlog because hdfs does not do write-append).
-
String serverName;
try {
serverName = Writables.bytesToString(results.get(COL_SERVER));
-
} catch(UnsupportedEncodingException e) {
LOG.error("Server name", e);
break;
}
if (serverName.length() > 0 &&
deadServerName.compareTo(serverName) != 0) {
-
// This isn't the server you're looking for - move along
-
if (LOG.isDebugEnabled()) {
LOG.debug("Server name " + serverName + " is not same as " +
deadServerName + ": Passing");
@@ -1827,12 +1778,10 @@
}
// Bingo! Found it.
-
HRegionInfo info = null;
try {
info = (HRegionInfo) Writables.getWritable(
results.get(COL_REGIONINFO), new HRegionInfo());
-
} catch (IOException e) {
LOG.error("Read fields", e);
break;
@@ -1857,56 +1806,42 @@
killList.put(deadServerName, regionsToKill);
unassignedRegions.remove(info.regionName);
assignAttempts.remove(info.regionName);
-
if (regionsToDelete.contains(info.regionName)) {
// Delete this region
-
regionsToDelete.remove(info.regionName);
todo.deleteRegion = true;
-
} else {
// Mark region offline
-
todo.regionOffline = true;
}
}
} else {
// Get region reassigned
-
regions.put(info.regionName, info);
// If it was pending, remove.
// Otherwise will obstruct its getting reassigned.
-
pendingRegions.remove(info.getRegionName());
}
}
-
} finally {
if(scannerId != -1L) {
try {
server.close(scannerId);
-
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- LOG.error("Closing scanner", e);
+ LOG.error("Closing scanner",
+ RemoteExceptionHandler.checkIOException(e));
}
}
}
// Remove server from root/meta entries
-
for (ToDoEntry e: toDoList) {
BatchUpdate b = new BatchUpdate(rand.nextLong());
long lockid = b.startUpdate(e.row);
-
if (e.deleteRegion) {
b.delete(lockid, COL_REGIONINFO);
-
} else if (e.regionOffline) {
e.info.offLine = true;
b.put(lockid, COL_REGIONINFO, Writables.getBytes(e.info));
@@ -1917,11 +1852,9 @@
}
// Get regions reassigned
-
for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
Text region = e.getKey();
HRegionInfo regionInfo = e.getValue();
-
unassignedRegions.put(region, regionInfo);
assignAttempts.put(region, Long.valueOf(0L));
}
@@ -1937,20 +1870,17 @@
if (!logSplit) {
// Process the old log file
-
StringBuilder dirName = new StringBuilder("log_");
dirName.append(deadServer.getBindAddress());
dirName.append("_");
dirName.append(deadServer.getPort());
Path logdir = new Path(dir, dirName.toString());
-
if (fs.exists(logdir)) {
if (!splitLogLock.tryLock()) {
return false;
}
try {
HLog.splitLog(dir, logdir, fs, conf);
-
} finally {
splitLogLock.unlock();
}
@@ -1978,7 +1908,7 @@
HRegionInterface server = null;
long scannerId = -1L;
for (int tries = 0; tries < numRetries; tries ++) {
- if (closed) {
+ if (closed.get()) {
return true;
}
if (rootRegionLocation.get() == null || !rootScanned) {
@@ -2007,11 +1937,7 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- throw e;
+ throw RemoteExceptionHandler.checkIOException(e);
}
}
}
@@ -2025,7 +1951,7 @@
for (int tries = 0; tries < numRetries; tries++) {
try {
- if (closed) {
+ if (closed.get()) {
return true;
}
if (!rootScanned ||
@@ -2072,11 +1998,7 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- throw e;
+ throw RemoteExceptionHandler.checkIOException(e);
}
}
}
@@ -2123,7 +2045,7 @@
@Override
boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
- if (closed) {
+ if (closed.get()) {
return true;
}
LOG.info("region closed: " + regionInfo.regionName);
@@ -2191,11 +2113,7 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- throw e;
+ throw RemoteExceptionHandler.checkIOException(e);
}
continue;
}
@@ -2210,12 +2128,8 @@
} else if (deleteRegion) {
try {
HRegion.deleteRegion(fs, dir, regionInfo.regionName);
-
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
+ e = RemoteExceptionHandler.checkIOException(e);
LOG.error("failed delete region " + regionInfo.regionName, e);
throw e;
}
@@ -2262,7 +2176,7 @@
@Override
boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
- if (closed) {
+ if (closed.get()) {
return true;
}
LOG.info(region.getRegionName() + " open on " +
@@ -2354,11 +2268,7 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- throw e;
+ throw RemoteExceptionHandler.checkIOException(e);
}
}
}
@@ -2372,7 +2282,7 @@
/** {@inheritDoc} */
public boolean isMasterRunning() {
- return !closed;
+ return !closed.get();
}
/** {@inheritDoc} */
@@ -2380,7 +2290,7 @@
TimerTask tt = new TimerTask() {
@Override
public void run() {
- closed = true;
+ closed.set(true);
synchronized(msgQueue) {
msgQueue.clear(); // Empty the queue
msgQueue.notifyAll(); // Wake main thread
@@ -2404,8 +2314,7 @@
try {
// We can not access meta regions if they have not already been
// assigned and scanned. If we timeout waiting, just shutdown.
-
- if (metaScanner.waitForMetaRegionsOrClose()) {
+ if (this.metaScannerThread.waitForMetaRegionsOrClose()) {
break;
}
createTable(newRegion);
@@ -2414,11 +2323,7 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- throw e;
+ throw RemoteExceptionHandler.checkIOException(e);
}
}
}
@@ -2559,7 +2464,7 @@
// We can not access any meta region if they have not already been
// assigned and scanned.
- if (metaScanner.waitForMetaRegionsOrClose()) {
+ if (metaScannerThread.waitForMetaRegionsOrClose()) {
throw new MasterNotRunningException(); // We're shutting down. Forget it.
}
@@ -2657,12 +2562,8 @@
if (scannerId != -1L) {
try {
server.close(scannerId);
-
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
+ e = RemoteExceptionHandler.checkIOException(e);
LOG.error("", e);
}
}
@@ -2682,14 +2583,8 @@
} catch (IOException e) {
if (tries == numRetries - 1) {
// No retries left
-
checkFileSystem();
-
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- throw e;
+ throw RemoteExceptionHandler.checkIOException(e);
}
continue;
}
@@ -2869,11 +2764,8 @@
HRegion.deleteRegion(fs, dir, i.regionName);
} catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- }
- LOG.error("failed to delete region " + i.regionName, e);
+ LOG.error("failed to delete region " + i.regionName,
+ RemoteExceptionHandler.checkIOException(e));
}
}
super.postProcessMeta(m, server);
@@ -2983,13 +2875,10 @@
/** {@inheritDoc} */
public void leaseExpired() {
LOG.info(server + " lease expired");
-
// Remove the server from the known servers list and update load info
-
HServerInfo info;
synchronized (serversToServerInfo) {
info = serversToServerInfo.remove(server);
-
if (info != null) {
String serverName = info.getServerAddress().toString();
HServerLoad load = serversToLoad.remove(serverName);
@@ -3015,8 +2904,8 @@
// continue. We used to throw a RuntimeException here but on exit
// this put is often interrupted. For now, just log these iterrupts
// rather than throw an exception
- LOG.warn("MsgQueue.put was interrupted (If we are exiting, this msg " +
- "can be ignored");
+ LOG.debug("MsgQueue.put was interrupted (If we are exiting, this " +
+ "msg can be ignored)");
}
}
}
@@ -3031,11 +2920,8 @@
System.exit(0);
}
- /**
- * Main program
- * @param args
- */
- public static void main(String [] args) {
+ protected static void doMain(String [] args,
+ Class<? extends HMaster> masterClass) {
if (args.length < 1) {
printUsageAndExit();
}
@@ -3054,7 +2940,10 @@
if (cmd.equals("start")) {
try {
- (new Thread(new HMaster(conf))).start();
+ Constructor<? extends HMaster> c =
+ masterClass.getConstructor(Configuration.class);
+ HMaster master = c.newInstance(conf);
+ master.start();
} catch (Throwable t) {
LOG.error( "Can not start master", t);
System.exit(-1);
@@ -3076,5 +2965,13 @@
// Print out usage if we get to here.
printUsageAndExit();
}
+ }
+
+ /**
+ * Main program
+ * @param args
+ */
+ public static void main(String [] args) {
+ doMain(args, HMaster.class);
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 08:14:53 2007
@@ -834,37 +834,45 @@
// When execution returns from snapshotMemcacheForLog() with a non-NULL
// value, the HMemcache will have a snapshot object stored that must be
// explicitly cleaned up using a call to deleteSnapshot().
+ //
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
if(retval == null || retval.memcacheSnapshot == null) {
LOG.debug("Finished memcache flush; empty snapshot");
return;
}
- long logCacheFlushId = retval.sequenceId;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshotted memcache for region " +
- this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
- " and entries " + retval.memcacheSnapshot.size());
- }
-
- // A. Flush memcache to all the HStores.
- // Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
- for(HStore hstore: stores.values()) {
- hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
- }
+ try {
+ long logCacheFlushId = retval.sequenceId;
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotted memcache for region " +
+ this.regionInfo.regionName + " with sequence id " +
+ retval.sequenceId + " and entries " +
+ retval.memcacheSnapshot.size());
+ }
- // B. Write a FLUSHCACHE-COMPLETE message to the log.
- // This tells future readers that the HStores were emitted correctly,
- // and that all updates to the log for this regionName that have lower
- // log-sequence-ids can be safely ignored.
-
- log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
+ // A. Flush memcache to all the HStores.
+ // Keep running vector of all store files that includes both old and the
+ // just-made new flush store file.
+ for(HStore hstore: stores.values()) {
+ hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ }
- // C. Delete the now-irrelevant memcache snapshot; its contents have been
- // dumped to disk-based HStores.
- memcache.deleteSnapshot();
+ // B. Write a FLUSHCACHE-COMPLETE message to the log.
+ // This tells future readers that the HStores were emitted correctly,
+ // and that all updates to the log for this regionName that have lower
+ // log-sequence-ids can be safely ignored.
+ log.completeCacheFlush(this.regionInfo.regionName,
+ regionInfo.tableDesc.getName(), logCacheFlushId);
+ } catch (IOException e) {
+ LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
+ log.abort();
+ throw e;
+ } finally {
+ // C. Delete the now-irrelevant memcache snapshot; its contents have been
+ // dumped to disk-based HStores.
+ memcache.deleteSnapshot();
+ }
+
// D. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
synchronized(this) {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Sep 15 08:14:53 2007
@@ -20,11 +20,14 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
@@ -33,6 +36,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -41,44 +45,43 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Writables;
-
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+ static final Log LOG = LogFactory.getLog(HRegionServer.class);
- /** {@inheritDoc} */
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
- throws IOException {
-
+ throws IOException {
if (protocol.equals(HRegionInterface.class.getName())) {
return HRegionInterface.versionID;
}
throw new IOException("Unknown protocol to name node: " + protocol);
}
-
- static final Log LOG = LogFactory.getLog(HRegionServer.class);
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
- // of HRegionServer in isolation.
- protected volatile boolean stopRequested;
+ // of HRegionServer in isolation. We use AtomicBoolean rather than
+ // plain boolean so we can pass a reference to Chore threads. Otherwise,
+ // Chore threads need to know about the hosting class.
+ protected AtomicBoolean stopRequested = new AtomicBoolean(false);
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
@@ -94,29 +97,55 @@
// region name -> HRegion
protected final SortedMap<Text, HRegion> onlineRegions;
- protected final Map<Text, HRegion> retiringRegions = new HashMap<Text, HRegion>();
-
+ protected final Map<Text, HRegion> retiringRegions =
+ new HashMap<Text, HRegion>();
+
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Vector<HMsg> outboundMsgs;
int numRetries;
- protected final long threadWakeFrequency;
- private final long msgInterval;
+ protected final int threadWakeFrequency;
+ private final int msgInterval;
+
+ // File paths
+ private FileSystem fs;
+
+ // Remote HMaster
+ private HMasterRegionInterface hbaseMaster;
+
+ // Server to handle client requests. Default access so can be accessed by
+ // unit tests.
+ Server server;
+
+ // Leases
+ private Leases leases;
+
+ // Request counter
+ private AtomicInteger requestCount;
+ // A sleeper that sleeps for msgInterval.
+ private final Sleeper sleeper;
+
// Check to see if regions should be split
- protected final long splitOrCompactCheckFrequency;
- private final SplitOrCompactChecker splitOrCompactChecker;
private final Thread splitOrCompactCheckerThread;
+ // Needed at shutdown. On way out, if can get this lock then we are not in
+ // middle of a split or compaction: i.e. splits/compactions cannot be
+ // interrupted.
protected final Integer splitOrCompactLock = new Integer(0);
- /** Runs periodically to determine if regions need to be compacted or split */
- class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
+ /**
+ * Runs periodically to determine if regions need to be compacted or split
+ */
+ class SplitOrCompactChecker extends Chore
+ implements RegionUnavailableListener {
private HTable root = null;
private HTable meta = null;
-
- /**
- * {@inheritDoc}
- */
+
+ public SplitOrCompactChecker(final AtomicBoolean stop) {
+ super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
+ 30 * 1000), stop);
+ }
+
public void closing(final Text regionName) {
lock.writeLock().lock();
try {
@@ -132,9 +161,6 @@
}
}
- /**
- * {@inheritDoc}
- */
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
@@ -146,87 +172,58 @@
lock.writeLock().unlock();
}
}
-
+
/**
- * {@inheritDoc}
+ * Scan for splits or compactions to run. Run any we find.
*/
- public void run() {
- while (!stopRequested) {
- long startTime = System.currentTimeMillis();
- synchronized (splitOrCompactLock) { // Don't interrupt us while we're working
- // Grab a list of regions to check
- ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
- lock.readLock().lock();
- try {
- regionsToCheck.addAll(onlineRegions.values());
- } finally {
- lock.readLock().unlock();
- }
- for(HRegion cur: regionsToCheck) {
- if(cur.isClosed()) {
- // Skip if closed
- continue;
- }
- try {
- if (cur.needsCompaction()) {
- cur.compactStores();
- }
- // After compaction, it probably needs splitting. May also need
- // splitting just because one of the memcache flushes was big.
- Text midKey = new Text();
- if (cur.needsSplit(midKey)) {
- split(cur, midKey);
- }
- } catch(IOException e) {
- //TODO: What happens if this fails? Are we toast?
- LOG.error("Split or compaction failed", e);
- if (!checkFileSystem()) {
- break;
- }
- }
+ protected void chore() {
+ // Don't interrupt us while we're working
+ synchronized (splitOrCompactLock) {
+ checkForSplitsOrCompactions();
+ }
+ }
+
+ private void checkForSplitsOrCompactions() {
+ // Grab a list of regions to check
+ List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck();
+ for(HRegion cur: nonClosedRegionsToCheck) {
+ try {
+ if (cur.needsCompaction()) {
+ cur.compactStores();
}
- }
-
- if (stopRequested) {
- continue;
- }
-
- // Sleep
- long waitTime = splitOrCompactCheckFrequency -
- (System.currentTimeMillis() - startTime);
- if (waitTime > 0) {
- try {
- Thread.sleep(waitTime);
- } catch(InterruptedException iex) {
- // continue
+ // After compaction, it probably needs splitting. May also need
+ // splitting just because one of the memcache flushes was big.
+ Text midKey = new Text();
+ if (cur.needsSplit(midKey)) {
+ split(cur, midKey);
+ }
+ } catch(IOException e) {
+ //TODO: What happens if this fails? Are we toast?
+ LOG.error("Split or compaction failed", e);
+ if (!checkFileSystem()) {
+ break;
}
}
}
- LOG.info("splitOrCompactChecker exiting");
}
private void split(final HRegion region, final Text midKey)
- throws IOException {
-
+ throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.closeAndSplit(midKey, this);
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
-
HTable t = null;
if (region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)) {
// We need to update the root region
-
- if (root == null) {
- root = new HTable(conf, ROOT_TABLE_NAME);
+ if (this.root == null) {
+ this.root = new HTable(conf, ROOT_TABLE_NAME);
}
t = root;
-
} else {
// For normal regions we need to update the meta region
-
if (meta == null) {
meta = new HTable(conf, META_TABLE_NAME);
}
@@ -234,184 +231,114 @@
}
LOG.info("Updating " + t.getTableName() + " with region split info");
- // Remove old region from META
+ // Mark old region as offline and split in META.
// NOTE: there is no need for retry logic here. HTable does it for us.
-
long lockid = t.startUpdate(oldRegionInfo.getRegionName());
oldRegionInfo.offLine = true;
oldRegionInfo.split = true;
t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
-
t.put(lockid, COL_SPLITA, Writables.getBytes(
- newRegions[0].getRegionInfo()));
-
+ newRegions[0].getRegionInfo()));
t.put(lockid, COL_SPLITB, Writables.getBytes(
- newRegions[1].getRegionInfo()));
+ newRegions[1].getRegionInfo()));
t.commit(lockid);
// Add new regions to META
-
for (int i = 0; i < newRegions.length; i++) {
lockid = t.startUpdate(newRegions[i].getRegionName());
-
t.put(lockid, COL_REGIONINFO, Writables.getBytes(
- newRegions[i].getRegionInfo()));
-
+ newRegions[i].getRegionInfo()));
t.commit(lockid);
}
// Now tell the master about the new regions
-
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting region split to master");
}
reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
newRegions[1].getRegionInfo());
-
LOG.info("region split, META update, and report to master all" +
" successful. Old region=" + oldRegionInfo.getRegionName() +
", new regions: " + newRegions[0].getRegionName() + ", " +
newRegions[1].getRegionName());
// Do not serve the new regions. Let the Master assign them.
-
}
}
// Cache flushing
- private final Flusher cacheFlusher;
private final Thread cacheFlusherThread;
+ // Needed during shutdown so we send an interrupt after completion of a
+ // flush, not in the midst.
protected final Integer cacheFlusherLock = new Integer(0);
/* Runs periodically to flush memcache.
*/
- class Flusher implements Runnable {
- /**
- * {@inheritDoc}
- */
- public void run() {
- while(! stopRequested) {
- long startTime = System.currentTimeMillis();
-
- synchronized(cacheFlusherLock) {
- // Grab a list of items to flush
- ArrayList<HRegion> toFlush = new ArrayList<HRegion>();
- lock.readLock().lock();
- try {
- toFlush.addAll(onlineRegions.values());
- } finally {
- lock.readLock().unlock();
- }
-
- // Flush them, if necessary
- for(HRegion cur: toFlush) {
- if(cur.isClosed()) { // Skip if closed
- continue;
- }
-
- try {
- cur.optionallyFlush();
- } catch (IOException iex) {
- if (iex instanceof RemoteException) {
- try {
- iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
- } catch (IOException x) {
- iex = x;
- }
- }
- LOG.error("Cache flush failed", iex);
- if (!checkFileSystem()) {
- break;
- }
- }
- }
- }
-
- // Sleep
- long waitTime = stopRequested? 0:
- threadWakeFrequency - (System.currentTimeMillis() - startTime);
- if(waitTime > 0) {
- try {
- Thread.sleep(waitTime);
- } catch(InterruptedException iex) {
- // continue
+ class Flusher extends Chore {
+ public Flusher(final int period, final AtomicBoolean stop) {
+ super(period, stop);
+ }
+
+ protected void chore() {
+ synchronized(cacheFlusherLock) {
+ checkForFlushesToRun();
+ }
+ }
+
+ private void checkForFlushesToRun() {
+ // Grab a list of items to flush
+ List<HRegion> nonClosedRegionsToFlush = getRegionsToCheck();
+ // Flush them, if necessary
+ for(HRegion cur: nonClosedRegionsToFlush) {
+ try {
+ cur.optionallyFlush();
+ } catch (IOException iex) {
+ LOG.error("Cache flush failed",
+ RemoteExceptionHandler.checkIOException(iex));
+ if (!checkFileSystem()) {
+ break;
}
}
}
- LOG.info("cacheFlusher exiting");
}
}
- // File paths
-
- FileSystem fs;
-
- // Logging
-
+ // HLog and HLog roller.
protected final HLog log;
- private final LogRoller logRoller;
private final Thread logRollerThread;
protected final Integer logRollerLock = new Integer(0);
- /** Runs periodically to determine if the log should be rolled */
- class LogRoller implements Runnable {
- private int maxLogEntries =
+ /** Runs periodically to determine if the HLog should be rolled */
+ class LogRoller extends Chore {
+ private int MAXLOGENTRIES =
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
- /**
- * {@inheritDoc}
- */
- public void run() {
- while(!stopRequested) {
- synchronized(logRollerLock) {
- // If the number of log entries is high enough, roll the log. This
- // is a very fast operation, but should not be done too frequently.
- int nEntries = log.getNumEntries();
- if(nEntries > this.maxLogEntries) {
- try {
- LOG.info("Rolling hlog. Number of entries: " + nEntries);
- log.rollWriter();
- } catch (IOException iex) {
- if (iex instanceof RemoteException) {
- try {
- iex = RemoteExceptionHandler.
- decodeRemoteException((RemoteException) iex);
- } catch (IOException x) {
- iex = x;
- }
- }
- LOG.error("Log rolling failed", iex);
- if (!checkFileSystem()) {
- break;
- }
- }
- }
- }
- if(!stopRequested) {
- try {
- Thread.sleep(threadWakeFrequency);
- } catch(InterruptedException iex) {
- // continue
- }
+ public LogRoller(final int period, final AtomicBoolean stop) {
+ super(period, stop);
+ }
+
+ protected void chore() {
+ synchronized(logRollerLock) {
+ checkForLogRoll();
+ }
+ }
+
+ private void checkForLogRoll() {
+ // If the number of log entries is high enough, roll the log. This
+ // is a very fast operation, but should not be done too frequently.
+ int nEntries = log.getNumEntries();
+ if(nEntries > this.MAXLOGENTRIES) {
+ try {
+ LOG.info("Rolling hlog. Number of entries: " + nEntries);
+ log.rollWriter();
+ } catch (IOException iex) {
+ LOG.error("Log rolling failed",
+ RemoteExceptionHandler.checkIOException(iex));
+ checkFileSystem();
}
}
- LOG.info("logRoller exiting");
}
}
-
- // Remote HMaster
-
- private HMasterRegionInterface hbaseMaster;
-
- // Server
-
- private Server server;
-
- // Leases
- private Leases leases;
-
- // Request counter
- private AtomicInteger requestCount;
/**
* Starts a HRegionServer at the default location
@@ -433,10 +360,8 @@
* @throws IOException
*/
public HRegionServer(Path rootDir, HServerAddress address,
- Configuration conf) throws IOException {
-
- // Basic setup
- this.stopRequested = false;
+ Configuration conf)
+ throws IOException {
this.abortRequested = false;
this.fsOk = true;
this.rootDir = rootDir;
@@ -450,25 +375,22 @@
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
- this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.msgInterval = conf.getLong("hbase.regionserver.msginterval",
- 3 * 1000);
- this.splitOrCompactCheckFrequency =
- conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
- 30 * 1000);
-
- // Cache flushing
- this.cacheFlusher = new Flusher();
- this.cacheFlusherThread = new Thread(cacheFlusher);
-
- // Check regions to see if they need to be split
- this.splitOrCompactChecker = new SplitOrCompactChecker();
- this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker);
+ this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
+
+ // Cache flushing chore thread.
+ this.cacheFlusherThread =
+ new Flusher(this.threadWakeFrequency, stopRequested);
+
+ // Check regions to see if they need to be split or compacted chore thread
+ this.splitOrCompactCheckerThread =
+ new SplitOrCompactChecker(this.stopRequested);
- // Process requests from Master
+ // Task thread to process requests from Master
this.toDo = new LinkedBlockingQueue<ToDoEntry>();
this.worker = new Worker();
this.workerThread = new Thread(worker);
+ this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
try {
// Server to handle client requests
@@ -499,8 +421,8 @@
}
this.log = new HLog(fs, logdir, conf);
- this.logRoller = new LogRoller();
- this.logRollerThread = new Thread(logRoller);
+ this.logRollerThread =
+ new LogRoller(this.threadWakeFrequency, stopRequested);
// Remote HMaster
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
@@ -508,11 +430,8 @@
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
conf);
} catch (IOException e) {
- this.stopRequested = true;
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- }
- throw e;
+ this.stopRequested.set(true);
+ throw RemoteExceptionHandler.checkIOException(e);
}
}
@@ -527,14 +446,15 @@
* <p>FOR DEBUGGING ONLY
*/
synchronized void stop() {
- stopRequested = true;
+ stopRequested.set(true);
notifyAll(); // Wakes run() if it is sleeping
}
/**
* Cause the server to exit without closing the regions it is serving, the
* log it is using and without notifying the master.
- * <p>FOR DEBUGGING ONLY
+ * Used unit testing and on catastrophic events such as HDFS is yanked out
+ * from under hbase or we OOME.
*/
synchronized void abort() {
abortRequested = true;
@@ -574,198 +494,138 @@
LOG.info("HRegionServer stopped at: " +
serverInfo.getServerAddress().toString());
}
-
+
/**
* The HRegionServer sticks in this loop until closed. It repeatedly checks
* in with the HMaster, sending heartbeats & reports, and receiving HRegion
* load/unload instructions.
*/
public void run() {
+ startAllServices();
- // Threads
-
- String threadName = Thread.currentThread().getName();
-
- workerThread.setName(threadName + ".worker");
- workerThread.start();
- cacheFlusherThread.setName(threadName + ".cacheFlusher");
- cacheFlusherThread.start();
- splitOrCompactCheckerThread.setName(threadName + ".splitOrCompactChecker");
- splitOrCompactCheckerThread.start();
- logRollerThread.setName(threadName + ".logRoller");
- logRollerThread.start();
- leases = new Leases(conf.getLong("hbase.regionserver.lease.period",
- 3 * 60 * 1000), threadWakeFrequency);
- leases.start();
-
- // Server
-
+ // Set below if HMaster asked us stop.
boolean masterRequestedStop = false;
+
try {
- this.server.start();
- LOG.info("HRegionServer started at: " +
- serverInfo.getServerAddress().toString());
- } catch(IOException e) {
- stopRequested = true;
- if (e instanceof RemoteException) {
+ while(!stopRequested.get()) {
+ long lastMsg = 0;
try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
- } catch (IOException ex) {
- e = ex;
+ reportForDuty();
+ } catch(IOException e) {
+ this.sleeper.sleep(lastMsg);
+ continue;
}
- }
- LOG.error("", e);
- }
-
- while(! stopRequested) {
- long lastMsg = 0;
- long waitTime;
- // Let the master know we're here
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Telling master we are up");
- }
- requestCount.set(0);
- serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
- hbaseMaster.regionServerStartup(serverInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done telling master we are up");
- }
- } catch(IOException e) {
- waitTime = stopRequested ? 0
- : msgInterval - (System.currentTimeMillis() - lastMsg);
- if(waitTime > 0) {
- synchronized (this) {
- try {
- wait(waitTime);
- } catch (InterruptedException e1) {
- // Go back up to the while test if stop has been requested.
+ // Now ask master what it wants us to do and tell it what we have done
+ for (int tries = 0; !stopRequested.get();) {
+ if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
+ HMsg outboundArray[] = null;
+ synchronized(outboundMsgs) {
+ outboundArray =
+ this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
+ this.outboundMsgs.clear();
}
- }
- }
- continue;
- }
-
- // Now ask master what it wants us to do and tell it what we have done.
- while (!stopRequested) {
- if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
- HMsg outboundArray[] = null;
- synchronized(outboundMsgs) {
- outboundArray = outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
- outboundMsgs.clear();
- }
-
- try {
- serverInfo.setLoad(new HServerLoad(requestCount.get(),
- onlineRegions.size()));
- requestCount.set(0);
-
- HMsg msgs[] =
- hbaseMaster.regionServerReport(serverInfo, outboundArray);
- lastMsg = System.currentTimeMillis();
-
- // Queue up the HMaster's instruction stream for processing
-
- boolean restart = false;
- for(int i = 0; i < msgs.length && !stopRequested && !restart; i++) {
- switch(msgs[i].getMsg()) {
-
- case HMsg.MSG_CALL_SERVER_STARTUP:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got call server startup message");
- }
- if (fsOk) {
- closeAllRegions();
- restart = true;
- }
- break;
- case HMsg.MSG_REGIONSERVER_STOP:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got regionserver stop message");
- }
- masterRequestedStop = true;
- stopRequested = true;
- break;
+ try {
+ this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
+ onlineRegions.size()));
+ this.requestCount.set(0);
+ HMsg msgs[] =
+ this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
+ lastMsg = System.currentTimeMillis();
+ // Queue up the HMaster's instruction stream for processing
+ boolean restart = false;
+ for(int i = 0; i < msgs.length && !stopRequested.get() &&
+ !restart; i++) {
+ switch(msgs[i].getMsg()) {
+
+ case HMsg.MSG_CALL_SERVER_STARTUP:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got call server startup message");
+ }
+ // We the MSG_CALL_SERVER_STARTUP on startup but we can also
+ // get it when the master is panicing because for instance
+ // the HDFS has been yanked out from under it. Be wary of
+ // this message.
+ if (checkFileSystem()) {
+ closeAllRegions();
+ restart = true;
+ }
+
+ break;
- default:
- if (fsOk) {
- try {
- toDo.put(new ToDoEntry(msgs[i]));
- } catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+ case HMsg.MSG_REGIONSERVER_STOP:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got regionserver stop message");
+ }
+ masterRequestedStop = true;
+ stopRequested.set(true);
+ break;
+
+ default:
+ if (fsOk) {
+ try {
+ toDo.put(new ToDoEntry(msgs[i]));
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Putting into msgQueue was " +
+ "interrupted.", e);
+ }
}
}
}
- }
-
- if(restart || stopRequested) {
- toDo.clear();
- break;
- }
-
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
- } catch (IOException ex) {
- e = ex;
+ if (restart || this.stopRequested.get()) {
+ toDo.clear();
+ break;
+ }
+ // Reset tries count if we had a successful transaction.
+ tries = 0;
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ if(tries < this.numRetries) {
+ LOG.warn("", e);
+ tries++;
+ } else {
+ LOG.error("Exceeded max retries: " + this.numRetries, e);
+ if (!checkFileSystem()) {
+ continue;
+ }
+ // Something seriously wrong. Shutdown.
+ stop();
}
}
- LOG.error("", e);
- }
- }
-
- waitTime = stopRequested ? 0
- : msgInterval - (System.currentTimeMillis() - lastMsg);
- if (waitTime > 0) {
- synchronized (this) {
- try {
- wait(waitTime);
- } catch(InterruptedException iex) {
- // On interrupt we go around to the while test of stopRequested
- }
- }
+ } // while (!stopRequested.get())
+ this.sleeper.sleep(lastMsg);
}
}
+ } catch (Throwable t) {
+ LOG.fatal("Unhandled exception. Aborting...", t);
+ abort();
}
leases.closeAfterLeasesExpire();
this.worker.stop();
this.server.stop();
// Send interrupts to wake up threads if sleeping so they notice shutdown.
-
+ // TODO: Should we check they are alive? If OOME could have exited already
synchronized(logRollerLock) {
this.logRollerThread.interrupt();
}
-
synchronized(cacheFlusherLock) {
this.cacheFlusherThread.interrupt();
}
-
synchronized(splitOrCompactLock) {
this.splitOrCompactCheckerThread.interrupt();
}
if (abortRequested) {
- if (fsOk) {
+ if (this.fsOk) {
// Only try to clean up if the file system is available
-
try {
log.close();
LOG.info("On abort, closed hlog");
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- } catch (IOException ex) {
- e = ex;
- }
- }
- LOG.error("Unable to close log in abort", e);
+ LOG.error("Unable to close log in abort",
+ RemoteExceptionHandler.checkIOException(e));
}
closeAllRegions(); // Don't leave any open file handles
}
@@ -776,18 +636,10 @@
try {
log.closeAndDelete();
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
- } catch (IOException ex) {
- e = ex;
- }
- }
- LOG.error("", e);
+ LOG.error("", RemoteExceptionHandler.checkIOException(e));
}
try {
- if (!masterRequestedStop) {
+ if (!masterRequestedStop && closedRegions != null) {
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING);
// Tell the master what regions we are/were serving
@@ -802,15 +654,7 @@
hbaseMaster.regionServerReport(serverInfo, exitMsg);
}
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
- } catch (IOException ex) {
- e = ex;
- }
- }
- LOG.warn("", e);
+ LOG.warn("", RemoteExceptionHandler.checkIOException(e));
}
LOG.info("stopping server at: " +
serverInfo.getServerAddress().toString());
@@ -820,6 +664,73 @@
LOG.info("main thread exiting");
}
+ /*
+ * Start Chore Threads, Server, Worker and lease checker threads. Install an
+ * UncaughtExceptionHandler that calls abort of RegionServer if we get
+ * an unhandled exception. We cannot set the handler on all threads.
+ * Server's internal Listener thread is off limits. For Server, if an OOME,
+ * it waits a while then retries. Meantime, a flush or a compaction that
+ * tries to run should trigger same critical condition and the shutdown will
+ * run. On its way out, this server will shut down Server. Leases are sort
+ * of inbetween. It has an internal thread that while it inherits from
+ * Chore, it keeps its own internal stop mechanism so needs to be stopped
+ * by this hosting server.
+ */
+ private void startAllServices() {
+ String n = Thread.currentThread().getName();
+ UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ abort();
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ }
+ };
+ Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher",
+ handler);
+ Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread,
+ n + ".splitOrCompactChecker", handler);
+ Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller",
+ handler);
+ // Worker is not the same as the above threads in that it does not
+ // inherit from Chore. Set an UncaughtExceptionHandler on it in case its
+ // the one to see an OOME, etc., first. The handler will set the stop
+ // flag.
+ Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
+ // Leases is not a Thread. Internally it runs a daemon thread. If it gets
+ // an unhandled exception, it will just exit.
+ this.leases = new Leases(
+ conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
+ this.threadWakeFrequency);
+ this.leases.setName(n + ".leaseChecker");
+ this.leases.start();
+ // Start Server. This service is like leases in that it internally runs
+ // a thread.
+ try {
+ this.server.start();
+ LOG.info("HRegionServer started at: " +
+ serverInfo.getServerAddress().toString());
+ } catch(IOException e) {
+ this.stopRequested.set(true);
+ LOG.fatal("Failed start Server",
+ RemoteExceptionHandler.checkIOException(e));
+ }
+ }
+
+ /*
+ * Let the master know we're here
+ * @throws IOException
+ */
+ private void reportForDuty() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Telling master we are up");
+ }
+ this.requestCount.set(0);
+ this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
+ this.hbaseMaster.regionServerStartup(serverInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done telling master we are up");
+ }
+ }
+
/** Add to the outbound message buffer */
private void reportOpen(HRegion region) {
synchronized(outboundMsgs) {
@@ -877,17 +788,15 @@
}
}
- /**
- * {@inheritDoc}
- */
public void run() {
- for(ToDoEntry e = null; !stopRequested; ) {
+ try {
+ for(ToDoEntry e = null; !stopRequested.get(); ) {
try {
e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
// continue
}
- if(e == null || stopRequested) {
+ if(e == null || stopRequested.get()) {
continue;
}
try {
@@ -895,15 +804,18 @@
switch(e.msg.getMsg()) {
- case HMsg.MSG_REGION_OPEN: // Open a region
+ case HMsg.MSG_REGION_OPEN:
+ // Open a region
openRegion(e.msg.getRegionInfo());
break;
- case HMsg.MSG_REGION_CLOSE: // Close a region
+ case HMsg.MSG_REGION_CLOSE:
+ // Close a region
closeRegion(e.msg.getRegionInfo(), true);
break;
- case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
+ case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
+ // Close a region, don't reply
closeRegion(e.msg.getRegionInfo(), false);
break;
@@ -913,14 +825,7 @@
+ e.msg.toString());
}
} catch (IOException ie) {
- if (ie instanceof RemoteException) {
- try {
- ie = RemoteExceptionHandler.decodeRemoteException((RemoteException) ie);
-
- } catch (IOException x) {
- ie = x;
- }
- }
+ ie = RemoteExceptionHandler.checkIOException(ie);
if(e.tries < numRetries) {
LOG.warn(ie);
e.tries++;
@@ -937,7 +842,11 @@
}
}
}
- LOG.info("worker thread exiting");
+ } catch(Throwable t) {
+ LOG.fatal("Unhandled exception", t);
+ } finally {
+ LOG.info("worker thread exiting");
+ }
}
}
@@ -991,15 +900,8 @@
try {
region.close(abortRequested);
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
- } catch (IOException x) {
- e = x;
- }
- }
- LOG.error("error closing region " + region.getRegionName(), e);
+ LOG.error("error closing region " + region.getRegionName(),
+ RemoteExceptionHandler.checkIOException(e));
}
}
return regionsToClose;
@@ -1188,14 +1090,7 @@
leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
return scannerId;
} catch (IOException e) {
- if (e instanceof RemoteException) {
- try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
- } catch (IOException x) {
- e = x;
- }
- }
- LOG.error("", e);
+ LOG.error("", RemoteExceptionHandler.checkIOException(e));
checkFileSystem();
throw e;
}
@@ -1344,16 +1239,39 @@
*
* @return false if file system is not available
*/
- protected boolean checkFileSystem() {
- if (fsOk) {
+ protected synchronized boolean checkFileSystem() {
+ if (this.fsOk) {
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down HRegionServer: file system not available");
- abortRequested = true;
- stopRequested = true;
+ this.abortRequested = true;
+ this.stopRequested.set(true);
fsOk = false;
}
}
- return fsOk;
+ return this.fsOk;
+ }
+
+ /**
+ * @return Returns list of non-closed regions hosted on this server. If no
+ * regions to check, returns an empty list.
+ */
+ protected List<HRegion> getRegionsToCheck() {
+ ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
+ lock.readLock().lock();
+ try {
+ regionsToCheck.addAll(this.onlineRegions.values());
+ } finally {
+ lock.readLock().unlock();
+ }
+ // Purge closed regions.
+ for (final ListIterator<HRegion> i = regionsToCheck.listIterator();
+ i.hasNext();) {
+ HRegion r = i.next();
+ if (r.isClosed()) {
+ i.remove();
+ }
+ }
+ return regionsToCheck;
}
//
@@ -1374,13 +1292,15 @@
}
/**
+ * Do class main.
* @param args
+ * @param regionServerClass HRegionServer to instantiate.
*/
- public static void main(String [] args) {
+ protected static void doMain(final String [] args,
+ final Class<? extends HRegionServer> regionServerClass) {
if (args.length < 1) {
printUsageAndExit();
}
-
Configuration conf = new HBaseConfiguration();
// Process command-line args. TODO: Better cmd-line processing
@@ -1394,7 +1314,13 @@
if (cmd.equals("start")) {
try {
- (new Thread(new HRegionServer(conf))).start();
+
+ Constructor<? extends HRegionServer> c =
+ regionServerClass.getConstructor(Configuration.class);
+ HRegionServer hrs = c.newInstance(conf);
+ Thread t = new Thread(hrs);
+ t.setName("regionserver" + hrs.server.getListenerAddress());
+ t.start();
} catch (Throwable t) {
LOG.error( "Can not start region server because "+
StringUtils.stringifyException(t) );
@@ -1412,4 +1338,11 @@
printUsageAndExit();
}
}
-}
+
+ /**
+ * @param args
+ */
+ public static void main(String [] args) {
+ doMain(args, HRegionServer.class);
+ }
+}
\ No newline at end of file