You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/04 20:18:41 UTC
svn commit: r1584856 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Fri Apr 4 18:18:40 2014
New Revision: 1584856
URL: http://svn.apache.org/r1584856
Log:
[HBASE-9143] update memstore flush threads in online fashion
Author: khadkevich
Summary: the number of flush thread (hbase.regionserver.flusher.count) can be updated in an online fashion
Test Plan: Run org.apache.hadoop.hbase.regionserver.TestRegionServerOnlineConfigChange::testNumMemstoreFlushThreadsOnlineChange() testcase.
Reviewers: rshroff, gauravm, liyintang
Reviewed By: gauravm
CC: sameet, ming, hbase-eng@, aaiyer
Differential Revision: https://phabricator.fb.com/D920263
Task ID: 2637178
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Apr 4 18:18:40 2014
@@ -424,6 +424,12 @@ public final class HConstants {
public static final String HREGION_MEMSTORE_FLUSH_SIZE =
"hbase.hregion.memstore.flush.size";
+ /** Parameter name for the number of split threads */
+ public static final String FLUSH_THREADS = "hbase.regionserver.flusher.count";
+
+ /** Default number of split threads */
+ public static final int DEFAULT_FLUSH_THREADS = 2;
+
/** Conf key for enabling Per Column Family flushing of memstores */
public static final String HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH =
"hbase.hregion.memstore.percolumnfamilyflush.enabled";
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Apr 4 18:18:40 2014
@@ -656,6 +656,7 @@ public class HRegionServer implements HR
this.compactSplitThread = new CompactSplitThread(this);
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
+ configurationManager.registerObserver(this.cacheFlusher);
// Log rolling thread
int hlogCntPerServer = this.conf.getInt(HConstants.HLOG_CNT_PER_SERVER, 2);
@@ -1822,7 +1823,6 @@ public class HRegionServer implements HR
abort("Uncaught exception in service thread " + t.getName(), e);
}
};
- this.cacheFlusher.start(n, handler);
// Initialize the hlog roller threads
for (int i = 0; i < this.hlogRollers.length; i++) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Fri Apr 4 18:18:40 2014
@@ -24,21 +24,22 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Future;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -52,17 +53,14 @@ import java.util.concurrent.locks.Reentr
*
* @see FlushRequester
*/
-class MemStoreFlusher implements FlushRequester {
+class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
// These two data members go together. Any entry in the one must have
// a corresponding entry in the other.
- private final BlockingQueue<FlushQueueEntry> flushQueue =
- new DelayQueue<FlushQueueEntry>();
private final Map<HRegion, FlushQueueEntry> regionsInQueue =
new HashMap<HRegion, FlushQueueEntry>();
private final boolean perColumnFamilyFlushEnabled;
- private final long threadWakeFrequency;
private final HRegionServer server;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -78,8 +76,9 @@ class MemStoreFlusher implements FlushRe
private long blockingStoreFilesNumber;
private long blockingWaitTime;
- private FlushHandler[] flushHandlers = null;
private int handlerCount;
+ private final ThreadPoolExecutor flushes;
+ Map<FlushQueueEntry, Future> futures = new HashMap<FlushQueueEntry, Future>();
/**
* @param conf
@@ -89,8 +88,6 @@ class MemStoreFlusher implements FlushRe
final HRegionServer server) {
super();
this.server = server;
- this.threadWakeFrequency =
- conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
UPPER_KEY, conf);
@@ -114,13 +111,16 @@ class MemStoreFlusher implements FlushRe
HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
// number of "memstore flusher" threads per region server
- this.handlerCount = conf.getInt("hbase.regionserver.flusher.count", 2);
+ this.handlerCount = conf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
LOG.info("globalMemStoreLimit=" +
StringUtils.humanReadableInt(this.globalMemStoreLimit) +
", globalMemStoreLimitLowMark=" +
StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
", maxHeap=" + StringUtils.humanReadableInt(max));
+ this.flushes = new ThreadPoolExecutor(handlerCount, handlerCount,
+ 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new DaemonThreadFactory("flush-thread-"));
}
/**
@@ -147,102 +147,60 @@ class MemStoreFlusher implements FlushRe
return (long)(max * limit);
}
- private class FlushHandler extends HasThread {
-
- FlushHandler(String threadName) {
- this.setDaemon(true);
- this.setName(threadName);
+ public void request(HRegion r, boolean isSelective) {
+ synchronized (regionsInQueue) {
+ if (!regionsInQueue.containsKey(r)) {
+ // This entry has no delay so it will be added at the top of the flush
+ // queue. It'll come out near immediately.
+ FlushQueueEntry fqe = new FlushQueueEntry(r, isSelective);
+ this.regionsInQueue.put(r, fqe);
+ executeFlushQueueEntry(fqe);
+ }
}
+ }
- @Override
- public void run() {
- while (!server.isStopRequestedAtStageTwo()) {
- FlushQueueEntry fqe = null;
+ protected void executeFlushQueueEntry(final FlushQueueEntry fqe) {
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
try {
- fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
- if (fqe == null) {
- continue;
- }
- if (!flushRegion(fqe, getName())) {
+ String name = String.format("%s.cacheFlusher.%d", MemStoreFlusher.this.server.getRSThreadName(),
+ MemStoreFlusher.this.flushes.getCorePoolSize() + 1);
+ if (!flushRegion(fqe, name)) {
LOG.warn("Failed to flush " + fqe.region);
}
- } catch (InterruptedException ex) {
- continue;
- } catch (ConcurrentModificationException ex) {
- continue;
} catch (Exception ex) {
LOG.error("Cache flush failed" +
- (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
- ex);
+ (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
+ ex
+ );
server.checkFileSystem();
}
}
- LOG.info(getName() + " exiting");
- }
- }
-
- public void request(HRegion r) {
- request(r, false);
- }
-
- public void request(HRegion r, boolean selectiveFlushRequest) {
- synchronized (regionsInQueue) {
- if (!regionsInQueue.containsKey(r)) {
- // This entry has no delay so it will be added at the top of the flush
- // queue. It'll come out near immediately.
- FlushQueueEntry fqe = new FlushQueueEntry(r, selectiveFlushRequest);
- this.regionsInQueue.put(r, fqe);
- this.flushQueue.add(fqe);
- }
- }
+ };
+ futures.put(fqe, this.flushes.submit(runnable));
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
- lock.writeLock().lock();
- try {
- for (FlushHandler flushHandler : flushHandlers) {
- if (flushHandler != null)
- flushHandler.interrupt();
- }
- } finally {
- lock.writeLock().unlock();
- }
+ flushes.shutdown();
}
- /**
- * Start the flusher threads.
- *
- * @param rsThreadName prefix for thread name (since there might be multiple
- * region servers running within the same JVM.
- * @param eh
- */
- void start(String rsThreadName, UncaughtExceptionHandler eh) {
- flushHandlers = new FlushHandler[handlerCount];
- for (int i = 0; i < flushHandlers.length; i++) {
- flushHandlers[i] = new FlushHandler(rsThreadName + ".cacheFlusher." + i);
- if (eh != null) {
- flushHandlers[i].setUncaughtExceptionHandler(eh);
- }
- flushHandlers[i].start();
- }
- }
boolean isAlive() {
- for (FlushHandler flushHander : flushHandlers) {
- if (flushHander != null && flushHander.isAlive()) {
- return true;
- }
- }
- return false;
+ return !flushes.isShutdown();
}
void join() {
- for (FlushHandler flushHandler : flushHandlers) {
- if (flushHandler != null) {
- Threads.shutdown(flushHandler.getThread());
+ boolean done = false;
+ while (!done) {
+ try {
+ done = flushes.awaitTermination(60, TimeUnit.SECONDS);
+ LOG.debug("Waiting for flush thread to finish...");
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted waiting for flush thread to finish...");
}
}
}
@@ -285,7 +243,7 @@ class MemStoreFlusher implements FlushRe
}
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
- this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
+ executeFlushQueueEntry(fqe.requeue(this.blockingWaitTime / 100));
// Tell a lie, it's not flushed but it's ok
return true;
}
@@ -308,31 +266,40 @@ class MemStoreFlusher implements FlushRe
* not flushed.
*/
private boolean flushRegion(final HRegion region, String why,
- final boolean emergencyFlush, boolean selectiveFlushRequest) {
+ final boolean emergencyFlush, boolean selectiveFlushRequest) {
synchronized (this.regionsInQueue) {
FlushQueueEntry fqe = this.regionsInQueue.remove(region);
if (fqe != null && emergencyFlush) {
- // Need to remove from region from delay queue. When NOT an
- // emergencyFlush, then item was removed via a flushQueue.poll.
- flushQueue.remove(fqe);
- }
- lock.readLock().lock();
- }
- try {
- if (region.flushcache(selectiveFlushRequest)) {
- server.compactSplitThread.requestCompaction(region, why);
+ Future future = futures.get(fqe);
+ if (future != null) {
+ try {
+ future.get();
+ if (region.flushcache(selectiveFlushRequest)) {
+ server.compactSplitThread.requestCompaction(region, why);
+ }
+ server.getMetrics().addFlush(region.getRecentFlushInfo());
+ } catch (IOException ex) {
+ LOG.warn("Cache flush failed" +
+ (region != null ? (" for region " +
+ Bytes.toString(region.getRegionName())) : ""),
+ RemoteExceptionHandler.checkIOException(ex)
+ );
+ server.checkFileSystem();
+ return false;
+ } catch (InterruptedException e) {
+ LOG.warn("Flush failed" +
+ (region != null ? (" for region " +
+ Bytes.toString(region.getRegionName())) : ""));
+ } catch (ExecutionException e) {
+ LOG.warn("Flush failed" +
+ (region != null ? (" for region " +
+ Bytes.toString(region.getRegionName())) : ""));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
}
- server.getMetrics().addFlush(region.getRecentFlushInfo());
- } catch (IOException ex) {
- LOG.warn("Cache flush failed" +
- (region != null ? (" for region " +
- Bytes.toString(region.getRegionName())) : ""),
- RemoteExceptionHandler.checkIOException(ex));
- server.checkFileSystem();
- return false;
- } finally {
- lock.readLock().unlock();
}
return true;
}
@@ -477,4 +444,30 @@ class MemStoreFlusher implements FlushRe
other.getDelay(TimeUnit.MILLISECONDS)).intValue();
}
}
+
+
+ @Override
+ public void notifyOnChange(Configuration newConf) {
+ // number of "memstore flusher" threads per region server
+ int handlerCount = newConf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
+ if(this.handlerCount != handlerCount){
+ LOG.info("Changing the value of " + HConstants.FLUSH_THREADS +
+ " from " + this.handlerCount + " to " +
+ handlerCount);
+ }
+ this.flushes.setMaximumPoolSize(handlerCount);
+ this.flushes.setCorePoolSize(handlerCount);
+ this.handlerCount = handlerCount;
+ }
+
+ /**
+ * Helper method for tests to check if the number of flush threads
+ * change on-the-fly.
+ *
+ * @return
+ */
+ protected int getFlushThreadNum() {
+ return this.flushes.getCorePoolSize();
+ }
+
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java Fri Apr 4 18:18:40 2014
@@ -127,6 +127,23 @@ public class TestRegionServerOnlineConfi
rs1.enableServerSideProfilingForAllCalls.get());
}
+
+ /**
+ * Check if the number of flush threads changes online
+ * @throws IOException
+ */
+ public void testNumMemstoreFlushThreadsOnlineChange() throws IOException {
+ assertTrue(rs1.cacheFlusher != null);
+ int newNumFlushThreads =
+ rs1.cacheFlusher.getFlushThreadNum() + 1;
+
+ conf.setInt(HConstants.FLUSH_THREADS, newNumFlushThreads);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+
+ assertEquals(newNumFlushThreads,
+ rs1.cacheFlusher.getFlushThreadNum());
+ }
+
/**
* Test that the configurations in the CompactionConfiguration class change
* properly.