You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by md...@apache.org on 2018/03/26 17:49:16 UTC

hbase git commit: HBASE-20095 Redesign single instance pool in CleanerChore

Repository: hbase
Updated Branches:
  refs/heads/master 6a5c14b22 -> 83fa0ad9e


HBASE-20095 Redesign single instance pool in CleanerChore


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/83fa0ad9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/83fa0ad9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/83fa0ad9

Branch: refs/heads/master
Commit: 83fa0ad9edcf952a574176b314f3b8c131aa2075
Parents: 6a5c14b
Author: Reid Chan <re...@outlook.com>
Authored: Mon Mar 26 11:39:30 2018 +0800
Committer: Mike Drob <md...@apache.org>
Committed: Mon Mar 26 12:48:31 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/master/HMaster.java |   3 +
 .../hbase/master/cleaner/CleanerChore.java      | 144 +++++++++++++------
 .../TestZooKeeperTableArchiveClient.java        |   3 +
 .../hbase/master/cleaner/TestCleanerChore.java  |   6 +
 .../hbase/master/cleaner/TestHFileCleaner.java  |   1 +
 .../master/cleaner/TestHFileLinkCleaner.java    |   1 +
 .../hbase/master/cleaner/TestLogsCleaner.java   |   1 +
 7 files changed, 118 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0dc5aa3..f5bd0de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
+import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
@@ -1146,6 +1147,8 @@ public class HMaster extends HRegionServer implements MasterServices {
    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
    startProcedureExecutor();
 
+    // Initial cleaner chore
+    CleanerChore.initChorePool(conf);
    // Start log cleaner thread
    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
    this.logCleaner =

http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index 46f6217..312bcce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@@ -51,7 +53,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
  * @param <T> Cleaner delegate class that is dynamically loaded from configuration
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
-    justification="TODO: Fix. It is wonky have static pool initialized from instance")
+    justification="Static pool will be only updated once.")
 @InterfaceAudience.Private
 public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
     implements ConfigurationObserver {
@@ -68,19 +70,93 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
   public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
   private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
 
+  private static class DirScanPool {
+    int size;
+    ForkJoinPool pool;
+    int cleanerLatch;
+    AtomicBoolean reconfigNotification;
+
+    DirScanPool(Configuration conf) {
+      String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
+      size = calculatePoolSize(poolSize);
+      // poolSize may be 0 or 0.0 from a careless configuration,
+      // double check to make sure.
+      size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
+      pool = new ForkJoinPool(size);
+      LOG.info("Cleaner pool size is {}", size);
+      reconfigNotification = new AtomicBoolean(false);
+      cleanerLatch = 0;
+    }
+
+    /**
+     * Checks if pool can be updated immediately.
+     * @param conf configuration
+     * @return true if pool can be updated immediately, false otherwise
+     */
+    synchronized boolean canUpdateImmediately(Configuration conf) {
+      int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
+      if (newSize == size) {
+        LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
+        return false;
+      }
+      size = newSize;
+      if (pool.getPoolSize() == 0) {
+        // chore has no working thread.
+        return true;
+      }
+      // Chore is working, update it later.
+      reconfigNotification.set(true);
+      return false;
+    }
+
+    /**
+     * Update pool with new size.
+     */
+    synchronized void updatePool(long timeout) {
+      while (cleanerLatch != 0) {
+        try {
+          wait(timeout);
+        } catch (InterruptedException ie) {
+          // It's ok to ignore
+        }
+        break;
+      }
+      pool.shutdownNow();
+      LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
+      pool = new ForkJoinPool(size);
+    }
+
+    synchronized void latchCountUp() {
+      cleanerLatch++;
+    }
+
+    synchronized void latchCountDown() {
+      cleanerLatch--;
+      notifyAll();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    synchronized void submit(ForkJoinTask task) {
+      pool.submit(task);
+    }
+  }
   // It may be waste resources for each cleaner chore own its pool,
   // so let's make pool for all cleaner chores.
-  private static volatile ForkJoinPool CHOREPOOL;
-  private static volatile int CHOREPOOLSIZE;
+  private static volatile DirScanPool POOL;
 
   protected final FileSystem fs;
   private final Path oldFileDir;
   private final Configuration conf;
   protected final Map<String, Object> params;
   private final AtomicBoolean enabled = new AtomicBoolean(true);
-  private final AtomicBoolean reconfig = new AtomicBoolean(false);
   protected List<T> cleanersChain;
 
+  public static void initChorePool(Configuration conf) {
+    if (POOL == null) {
+      POOL = new DirScanPool(conf);
+    }
+  }
+
   public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
                       FileSystem fs, Path oldFileDir, String confKey) {
     this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
@@ -99,21 +175,14 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
   public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
       FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
     super(name, s, sleepPeriod);
+
+    Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
+      + "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
     this.fs = fs;
     this.oldFileDir = oldFileDir;
     this.conf = conf;
     this.params = params;
     initCleanerChain(confKey);
-
-    if (CHOREPOOL == null) {
-      String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
-      CHOREPOOLSIZE = calculatePoolSize(poolSize);
-      // poolSize may be 0 or 0.0 from a careless configuration,
-      // double check to make sure.
-      CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE;
-      this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE);
-      LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE);
-    }
   }
 
   /**
@@ -174,25 +243,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
 
   @Override
   public void onConfigurationChange(Configuration conf) {
-    int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
-    if (updatedSize == CHOREPOOLSIZE) {
-      LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize);
-      return;
+    if (POOL.canUpdateImmediately(conf)) {
+      // Can immediately update, no need to wait.
+      POOL.updatePool(0);
     }
-    CHOREPOOLSIZE = updatedSize;
-    if (CHOREPOOL.getPoolSize() == 0) {
-      // Chore does not work now, update it directly.
-      updateChorePoolSize(updatedSize);
-      return;
-    }
-    // Chore is working, update it after chore finished.
-    reconfig.set(true);
-  }
-
-  private void updateChorePoolSize(int updatedSize) {
-    CHOREPOOL.shutdownNow();
-    LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize);
-    CHOREPOOL = new ForkJoinPool(updatedSize);
   }
 
   /**
@@ -221,14 +275,22 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
   @Override
   protected void chore() {
     if (getEnabled()) {
-      if (runCleaner()) {
-        LOG.debug("Cleaned all WALs under {}",  oldFileDir);
-      } else {
-        LOG.warn("WALs outstanding under {}", oldFileDir);
+      try {
+        POOL.latchCountUp();
+        if (runCleaner()) {
+          LOG.debug("Cleaned all WALs under {}", oldFileDir);
+        } else {
+          LOG.warn("WALs outstanding under {}", oldFileDir);
+        }
+      } finally {
+        POOL.latchCountDown();
       }
-      // After each clean chore, checks if receives reconfigure notification while cleaning
-      if (reconfig.compareAndSet(true, false)) {
-        updateChorePoolSize(CHOREPOOLSIZE);
+      // After each cleaner chore, checks if received reconfigure notification while cleaning.
+      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
+      if (POOL.reconfigNotification.compareAndSet(true, false)) {
+        // This cleaner is waiting for other cleaners finishing their jobs.
+        // To avoid missing next chore, only wait 0.8 * period, then shutdown.
+        POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
       }
     } else {
       LOG.debug("Cleaner chore disabled! Not cleaning.");
@@ -242,7 +304,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
   public Boolean runCleaner() {
     preRunCleaner();
     CleanerTask task = new CleanerTask(this.oldFileDir, true);
-    CHOREPOOL.submit(task);
+    POOL.submit(task);
     return task.join();
   }
 
@@ -374,7 +436,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
 
   @VisibleForTesting
   int getChorePoolSize() {
-    return CHOREPOOLSIZE;
+    return POOL.size;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
index f3e193e1..16f3930 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -175,6 +176,7 @@ public class TestZooKeeperTableArchiveClient {
     Configuration conf = UTIL.getConfiguration();
     // setup the delegate
     Stoppable stop = new StoppableImplementation();
+    CleanerChore.initChorePool(conf);
     HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
     List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
     final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
@@ -229,6 +231,7 @@ public class TestZooKeeperTableArchiveClient {
     // setup the delegate
     Stoppable stop = new StoppableImplementation();
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
+    CleanerChore.initChorePool(conf);
     HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
     List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
     final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
index 22fa292..9438319 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.StoppableImplementation;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -55,6 +56,11 @@ public class TestCleanerChore {
   private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class);
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
+  @Before
+  public void setup() throws Exception {
+    CleanerChore.initChorePool(UTIL.getConfiguration());
+  }
+
   @After
   public void cleanup() throws Exception {
     // delete and recreate the test directory, ensuring a clean test dir between tests

http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 32480ea..465e193 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -68,6 +68,7 @@ public class TestHFileCleaner {
   public static void setupCluster() throws Exception {
     // have to use a minidfs cluster because the localfs doesn't modify file times correctly
     UTIL.startMiniDFSCluster(1);
+    CleanerChore.initChorePool(UTIL.getConfiguration());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
index 667a33e..c011ea8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
@@ -108,6 +108,7 @@ public class TestHFileLinkCleaner {
     final long ttl = 1000;
     conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
     Server server = new DummyServer();
+    CleanerChore.initChorePool(conf);
     HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir);
 
     // Link backref cannot be removed

http://git-wip-us.apache.org/repos/asf/hbase/blob/83fa0ad9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 7423d26..0263085 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -85,6 +85,7 @@ public class TestLogsCleaner {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniZKCluster();
     TEST_UTIL.startMiniDFSCluster(1);
+    CleanerChore.initChorePool(TEST_UTIL.getConfiguration());
   }
 
   @AfterClass