You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/04 20:18:41 UTC

svn commit: r1584856 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Fri Apr  4 18:18:40 2014
New Revision: 1584856

URL: http://svn.apache.org/r1584856
Log:
[HBASE-9143] update memstore flush threads in online fashion

Author: khadkevich

Summary: the number of flush thread (hbase.regionserver.flusher.count) can be updated in an online fashion

Test Plan: Run org.apache.hadoop.hbase.regionserver.TestRegionServerOnlineConfigChange::testNumMemstoreFlushThreadsOnlineChange() testcase.

Reviewers: rshroff, gauravm, liyintang

Reviewed By: gauravm

CC: sameet, ming, hbase-eng@, aaiyer

Differential Revision: https://phabricator.fb.com/D920263

Task ID: 2637178

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Apr  4 18:18:40 2014
@@ -424,6 +424,12 @@ public final class HConstants {
   public static final String HREGION_MEMSTORE_FLUSH_SIZE =
       "hbase.hregion.memstore.flush.size";
 
+  /** Parameter name for the number of split threads */
+  public static final String FLUSH_THREADS = "hbase.regionserver.flusher.count";
+
+  /** Default number of split threads */
+  public static final int DEFAULT_FLUSH_THREADS = 2;
+
   /** Conf key for enabling Per Column Family flushing of memstores */
   public static final String HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH =
       "hbase.hregion.memstore.percolumnfamilyflush.enabled";

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Apr  4 18:18:40 2014
@@ -656,6 +656,7 @@ public class HRegionServer implements HR
     this.compactSplitThread = new CompactSplitThread(this);
     // Registering the compactSplitThread object with the ConfigurationManager.
     configurationManager.registerObserver(this.compactSplitThread);
+    configurationManager.registerObserver(this.cacheFlusher);
 
     // Log rolling thread
     int hlogCntPerServer = this.conf.getInt(HConstants.HLOG_CNT_PER_SERVER, 2);
@@ -1822,7 +1823,6 @@ public class HRegionServer implements HR
         abort("Uncaught exception in service thread " + t.getName(), e);
       }
     };
-    this.cacheFlusher.start(n, handler);
 
     // Initialize the hlog roller threads
     for (int i = 0; i < this.hlogRollers.length; i++) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Fri Apr  4 18:18:40 2014
@@ -24,21 +24,22 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Future;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -52,17 +53,14 @@ import java.util.concurrent.locks.Reentr
  *
  * @see FlushRequester
  */
-class MemStoreFlusher implements FlushRequester {
+class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
-  private final BlockingQueue<FlushQueueEntry> flushQueue =
-    new DelayQueue<FlushQueueEntry>();
   private final Map<HRegion, FlushQueueEntry> regionsInQueue =
     new HashMap<HRegion, FlushQueueEntry>();
 
   private final boolean perColumnFamilyFlushEnabled;
-  private final long threadWakeFrequency;
   private final HRegionServer server;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -78,8 +76,9 @@ class MemStoreFlusher implements FlushRe
   private long blockingStoreFilesNumber;
   private long blockingWaitTime;
 
-  private FlushHandler[] flushHandlers = null;
   private int handlerCount;
+  private final ThreadPoolExecutor flushes;
+  Map<FlushQueueEntry, Future> futures = new HashMap<FlushQueueEntry, Future>();
 
   /**
    * @param conf
@@ -89,8 +88,6 @@ class MemStoreFlusher implements FlushRe
       final HRegionServer server) {
     super();
     this.server = server;
-    this.threadWakeFrequency =
-      conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
     this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
       UPPER_KEY, conf);
@@ -114,13 +111,16 @@ class MemStoreFlusher implements FlushRe
             HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
             HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
     // number of "memstore flusher" threads per region server
-    this.handlerCount = conf.getInt("hbase.regionserver.flusher.count", 2);
+    this.handlerCount = conf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
 
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
       ", maxHeap=" + StringUtils.humanReadableInt(max));
+    this.flushes = new ThreadPoolExecutor(handlerCount, handlerCount,
+        60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+        new DaemonThreadFactory("flush-thread-"));
   }
 
   /**
@@ -147,102 +147,60 @@ class MemStoreFlusher implements FlushRe
     return (long)(max * limit);
   }
 
-  private class FlushHandler extends HasThread {
-
-    FlushHandler(String threadName) {
-      this.setDaemon(true);
-      this.setName(threadName);
+  public void request(HRegion r, boolean isSelective) {
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.containsKey(r)) {
+        // This entry has no delay so it will be added at the top of the flush
+        // queue.  It'll come out near immediately.
+        FlushQueueEntry fqe = new FlushQueueEntry(r, isSelective);
+        this.regionsInQueue.put(r, fqe);
+        executeFlushQueueEntry(fqe);
+      }
     }
+  }
 
-    @Override
-    public void run() {
-      while (!server.isStopRequestedAtStageTwo()) {
-        FlushQueueEntry fqe = null;
+  protected void executeFlushQueueEntry(final FlushQueueEntry fqe) {
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
         try {
-          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-          if (fqe == null) {
-            continue;
-          }
-          if (!flushRegion(fqe, getName())) {
+          String name = String.format("%s.cacheFlusher.%d", MemStoreFlusher.this.server.getRSThreadName(),
+                  MemStoreFlusher.this.flushes.getCorePoolSize() + 1);
+          if (!flushRegion(fqe, name)) {
             LOG.warn("Failed to flush " + fqe.region);
           }
-        } catch (InterruptedException ex) {
-          continue;
-        } catch (ConcurrentModificationException ex) {
-          continue;
         } catch (Exception ex) {
           LOG.error("Cache flush failed" +
-              (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
-              ex);
+                   (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
+                   ex
+          );
           server.checkFileSystem();
         }
       }
-      LOG.info(getName() + " exiting");
-    }
-  }
-
-  public void request(HRegion r) {
-    request(r, false);
-  }
-
-  public void request(HRegion r, boolean selectiveFlushRequest) {
-    synchronized (regionsInQueue) {
-      if (!regionsInQueue.containsKey(r)) {
-        // This entry has no delay so it will be added at the top of the flush
-        // queue.  It'll come out near immediately.
-        FlushQueueEntry fqe = new FlushQueueEntry(r, selectiveFlushRequest);
-        this.regionsInQueue.put(r, fqe);
-        this.flushQueue.add(fqe);
-      }
-    }
+    };
+    futures.put(fqe, this.flushes.submit(runnable));
   }
 
   /**
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    lock.writeLock().lock();
-    try {
-      for (FlushHandler flushHandler : flushHandlers) {
-        if (flushHandler != null)
-          flushHandler.interrupt();
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
+    flushes.shutdown();
   }
 
-  /**
-   * Start the flusher threads.
-   *
-   * @param rsThreadName prefix for thread name (since there might be multiple
-   *                     region servers running within the same JVM.
-   * @param eh
-   */
-  void start(String rsThreadName, UncaughtExceptionHandler eh) {
-    flushHandlers = new FlushHandler[handlerCount];
-    for (int i = 0; i < flushHandlers.length; i++) {
-      flushHandlers[i] = new FlushHandler(rsThreadName + ".cacheFlusher." + i);
-      if (eh != null) {
-        flushHandlers[i].setUncaughtExceptionHandler(eh);
-      }
-      flushHandlers[i].start();
-    }
-  }
 
   boolean isAlive() {
-    for (FlushHandler flushHander : flushHandlers) {
-      if (flushHander != null && flushHander.isAlive()) {
-        return true;
-      }
-    }
-    return false;
+    return !flushes.isShutdown();
   }
 
   void join() {
-    for (FlushHandler flushHandler : flushHandlers) {
-      if (flushHandler != null) {
-        Threads.shutdown(flushHandler.getThread());
+    boolean done = false;
+    while (!done) {
+      try {
+        done = flushes.awaitTermination(60, TimeUnit.SECONDS);
+        LOG.debug("Waiting for flush thread to finish...");
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted waiting for flush thread to finish...");
       }
     }
   }
@@ -285,7 +243,7 @@ class MemStoreFlusher implements FlushRe
         }
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
-        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
+        executeFlushQueueEntry(fqe.requeue(this.blockingWaitTime / 100));
         // Tell a lie, it's not flushed but it's ok
         return true;
       }
@@ -308,31 +266,40 @@ class MemStoreFlusher implements FlushRe
    * not flushed.
    */
   private boolean flushRegion(final HRegion region, String why,
-    final boolean emergencyFlush, boolean selectiveFlushRequest) {
+      final boolean emergencyFlush, boolean selectiveFlushRequest) {
 
     synchronized (this.regionsInQueue) {
       FlushQueueEntry fqe = this.regionsInQueue.remove(region);
       if (fqe != null && emergencyFlush) {
-        // Need to remove from region from delay queue.  When NOT an
-        // emergencyFlush, then item was removed via a flushQueue.poll.
-        flushQueue.remove(fqe);
-     }
-     lock.readLock().lock();
-    }
-    try {
-      if (region.flushcache(selectiveFlushRequest)) {
-        server.compactSplitThread.requestCompaction(region, why);
+        Future future = futures.get(fqe);
+        if (future != null) {
+          try {
+            future.get();
+            if (region.flushcache(selectiveFlushRequest)) {
+              server.compactSplitThread.requestCompaction(region, why);
+            }
+            server.getMetrics().addFlush(region.getRecentFlushInfo());
+          } catch (IOException ex) {
+            LOG.warn("Cache flush failed" +
+                            (region != null ? (" for region " +
+                                    Bytes.toString(region.getRegionName())) : ""),
+                    RemoteExceptionHandler.checkIOException(ex)
+            );
+            server.checkFileSystem();
+            return false;
+          } catch (InterruptedException e) {
+            LOG.warn("Flush failed" +
+                    (region != null ? (" for region " +
+                            Bytes.toString(region.getRegionName())) : ""));
+          } catch (ExecutionException e) {
+            LOG.warn("Flush failed" +
+                    (region != null ? (" for region " +
+                            Bytes.toString(region.getRegionName())) : ""));
+          } finally {
+            lock.readLock().unlock();
+          }
+        }
       }
-      server.getMetrics().addFlush(region.getRecentFlushInfo());
-    } catch (IOException ex) {
-      LOG.warn("Cache flush failed" +
-        (region != null ? (" for region " +
-        Bytes.toString(region.getRegionName())) : ""),
-        RemoteExceptionHandler.checkIOException(ex));
-      server.checkFileSystem();
-      return false;
-    } finally {
-      lock.readLock().unlock();
     }
     return true;
   }
@@ -477,4 +444,30 @@ class MemStoreFlusher implements FlushRe
         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
     }
   }
+
+
+  @Override
+  public void notifyOnChange(Configuration newConf) {
+    // number of "memstore flusher" threads per region server
+    int handlerCount = newConf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
+    if(this.handlerCount != handlerCount){
+      LOG.info("Changing the value of " + HConstants.FLUSH_THREADS +
+              " from " + this.handlerCount + " to " +
+              handlerCount);
+    }
+    this.flushes.setMaximumPoolSize(handlerCount);
+    this.flushes.setCorePoolSize(handlerCount);
+    this.handlerCount = handlerCount;
+  }
+
+  /**
+   * Helper method for tests to check if the number of flush threads
+   * change on-the-fly.
+   *
+   * @return
+   */
+  protected int getFlushThreadNum() {
+    return this.flushes.getCorePoolSize();
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java?rev=1584856&r1=1584855&r2=1584856&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java Fri Apr  4 18:18:40 2014
@@ -127,6 +127,23 @@ public class TestRegionServerOnlineConfi
                  rs1.enableServerSideProfilingForAllCalls.get());
   }
 
+
+  /**
+   * Check if the number of flush threads changes online
+   * @throws IOException
+   */
+  public void testNumMemstoreFlushThreadsOnlineChange() throws IOException {
+    assertTrue(rs1.cacheFlusher != null);
+    int newNumFlushThreads =
+            rs1.cacheFlusher.getFlushThreadNum() + 1;
+
+    conf.setInt(HConstants.FLUSH_THREADS, newNumFlushThreads);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+
+    assertEquals(newNumFlushThreads,
+            rs1.cacheFlusher.getFlushThreadNum());
+  }
+
   /**
    * Test that the configurations in the CompactionConfiguration class change
    * properly.