You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2018/07/23 03:37:35 UTC

hbase git commit: HBASE-20559 Backport HBASE-18083 (Make large/small file clean thread number configurable in HFileCleaner) to branch-1

Repository: hbase
Updated Branches:
  refs/heads/branch-1 780724b15 -> 896b69a0f


HBASE-20559 Backport HBASE-18083 (Make large/small file clean thread number configurable in HFileCleaner) to branch-1

The last port commit of HBASE-20555

Signed-off-by: Yu Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/896b69a0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/896b69a0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/896b69a0

Branch: refs/heads/branch-1
Commit: 896b69a0fc6c3164d33a1269e0d7ac87d89b3d90
Parents: 780724b
Author: TAK LON WU <wu...@amazon.com>
Authored: Mon Jul 23 11:23:57 2018 +0800
Committer: Yu Li <li...@apache.org>
Committed: Mon Jul 23 11:36:47 2018 +0800

----------------------------------------------------------------------
 .../hbase/master/cleaner/HFileCleaner.java      | 154 +++++++++++++------
 .../hbase/master/cleaner/TestHFileCleaner.java  |  13 +-
 2 files changed, 120 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/896b69a0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 70548b4..8f0b4be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,16 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
       "hbase.regionserver.hfilecleaner.small.queue.size";
   public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
 
+  // Configuration key for large file delete thread number
+  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
+      "hbase.regionserver.hfilecleaner.large.thread.count";
+  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
+
+  // Configuration key for small file delete thread number
+  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
+      "hbase.regionserver.hfilecleaner.small.thread.count";
+  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
+
   private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
 
   StealJobQueue<HFileDeleteTask> largeFileQueue;
@@ -73,11 +84,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
   private int throttlePoint;
   private int largeQueueInitSize;
   private int smallQueueInitSize;
+  private int largeFileDeleteThreadNumber;
+  private int smallFileDeleteThreadNumber;
   private List<Thread> threads = new ArrayList<Thread>();
   private boolean running;
 
-  private long deletedLargeFiles = 0L;
-  private long deletedSmallFiles = 0L;
+  private AtomicLong deletedLargeFiles = new AtomicLong();
+  private AtomicLong deletedSmallFiles = new AtomicLong();
 
   /**
    * @param period the period of time to sleep between each run
@@ -99,6 +112,10 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
         conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
     smallFileQueue = largeFileQueue.getStealFromQueue();
+    largeFileDeleteThreadNumber =
+        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+    smallFileDeleteThreadNumber =
+        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
     startHFileDeleteThreads();
   }
 
@@ -182,30 +199,34 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
     final String n = Thread.currentThread().getName();
     running = true;
     // start thread for large file deletion
-    Thread large = new Thread() {
-      @Override
-      public void run() {
-        consumerLoop(largeFileQueue);
-      }
-    };
-    large.setDaemon(true);
-    large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis());
-    large.start();
-    LOG.debug("Starting hfile cleaner for large files: " + large.getName());
-    threads.add(large);
+    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
+      Thread large = new Thread() {
+        @Override
+        public void run() {
+          consumerLoop(largeFileQueue);
+        }
+      };
+      large.setDaemon(true);
+      large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
+      large.start();
+      LOG.debug("Starting hfile cleaner for large files: " + large.getName());
+      threads.add(large);
+    }
 
     // start thread for small file deletion
-    Thread small = new Thread() {
-      @Override
-      public void run() {
-        consumerLoop(smallFileQueue);
-      }
-    };
-    small.setDaemon(true);
-    small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis());
-    small.start();
-    LOG.debug("Starting hfile cleaner for small files: " + small.getName());
-    threads.add(small);
+    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
+      Thread small = new Thread() {
+        @Override
+        public void run() {
+          consumerLoop(smallFileQueue);
+        }
+      };
+      small.setDaemon(true);
+      small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
+      small.start();
+      LOG.debug("Starting hfile cleaner for small files: " + small.getName());
+      threads.add(small);
+    }
   }
 
   protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
@@ -247,20 +268,20 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
   // Currently only for testing purpose
   private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
     if (isLargeFile) {
-      if (deletedLargeFiles == Long.MAX_VALUE) {
+      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
         LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
-        deletedLargeFiles = 0L;
+        deletedLargeFiles.set(0L);
       }
-      deletedLargeFiles++;
+      deletedLargeFiles.incrementAndGet();
     } else {
-      if (deletedSmallFiles == Long.MAX_VALUE) {
+      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
         LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
-        deletedSmallFiles = 0L;
+        deletedSmallFiles.set(0L);
       }
       if (fromLargeQueue && LOG.isTraceEnabled()) {
         LOG.trace("Stolen a small file deletion task in large file thread");
       }
-      deletedSmallFiles++;
+      deletedSmallFiles.incrementAndGet();
     }
   }
 
@@ -353,12 +374,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
 
   @VisibleForTesting
   public long getNumOfDeletedLargeFiles() {
-    return deletedLargeFiles;
+    return deletedLargeFiles.get();
   }
 
   @VisibleForTesting
   public long getNumOfDeletedSmallFiles() {
-    return deletedSmallFiles;
+    return deletedSmallFiles.get();
   }
 
   @VisibleForTesting
@@ -378,19 +399,14 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
 
   @Override
   public void onConfigurationChange(Configuration conf) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
-        .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
-        .append(", smallQueueInitSize: ").append(smallQueueInitSize);
+    if (!checkAndUpdateConfigurations(conf)) {
+      LOG.debug("Update configuration triggered but nothing changed for this cleaner");
+      return;
+    }
     stopHFileDeleteThreads();
-    this.throttlePoint =
-        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
-    this.largeQueueInitSize =
-        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
-    this.smallQueueInitSize =
-        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     // record the left over tasks
-    List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
+    List<HFileDeleteTask> leftOverTasks =
+        new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
     for (HFileDeleteTask task : largeFileQueue) {
       leftOverTasks.add(task);
     }
@@ -400,13 +416,59 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
     largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
     smallFileQueue = largeFileQueue.getStealFromQueue();
     threads.clear();
-    builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
-        .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
-    LOG.debug(builder.toString());
     startHFileDeleteThreads();
     // re-dispatch the left over tasks
     for (HFileDeleteTask task : leftOverTasks) {
       dispatch(task);
     }
   }
+
+  /**
+   * Check new configuration and update settings if value changed
+   * @param conf The new configuration
+   * @return true if any configuration for HFileCleaner changes, false if no change
+   */
+  private boolean checkAndUpdateConfigurations(Configuration conf) {
+    boolean updated = false;
+    int throttlePoint =
+        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+    if (throttlePoint != this.throttlePoint) {
+      LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " + throttlePoint);
+      this.throttlePoint = throttlePoint;
+      updated = true;
+    }
+    int largeQueueInitSize =
+        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+    if (largeQueueInitSize != this.largeQueueInitSize) {
+      LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize + " to "
+          + largeQueueInitSize);
+      this.largeQueueInitSize = largeQueueInitSize;
+      updated = true;
+    }
+    int smallQueueInitSize =
+        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+    if (smallQueueInitSize != this.smallQueueInitSize) {
+      LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize + " to "
+          + smallQueueInitSize);
+      this.smallQueueInitSize = smallQueueInitSize;
+      updated = true;
+    }
+    int largeFileDeleteThreadNumber =
+        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
+      LOG.debug("Updating largeFileDeleteThreadNumber, from " + this.largeFileDeleteThreadNumber
+          + " to " + largeFileDeleteThreadNumber);
+      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
+      updated = true;
+    }
+    int smallFileDeleteThreadNumber =
+        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
+    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
+      LOG.debug("Updating smallFileDeleteThreadNumber, from " + this.smallFileDeleteThreadNumber
+          + " to " + smallFileDeleteThreadNumber);
+      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
+      updated = true;
+    }
+    return updated;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/896b69a0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 18afafa..249780b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -321,6 +322,8 @@ public class TestHFileCleaner {
     final int UPDATE_QUEUE_INIT_SIZE = 1024;
     final int LARGE_FILE_NUM = 5;
     final int SMALL_FILE_NUM = 20;
+    final int LARGE_THREAD_NUM = 2;
+    final int SMALL_THREAD_NUM = 4;
 
     Configuration conf = UTIL.getConfiguration();
     // no cleaner policies = delete all files
@@ -363,6 +366,8 @@ public class TestHFileCleaner {
     newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
     newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
     newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
+    newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM);
+    newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM);
     LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
         + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
     cleaner.onConfigurationChange(newConf);
@@ -371,7 +376,13 @@ public class TestHFileCleaner {
     Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
     Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
     Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
-    Assert.assertEquals(2, cleaner.getCleanerThreads().size());
+    Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size());
+
+    // make sure no cost when onConfigurationChange called with no change
+    List<Thread> oldThreads = cleaner.getCleanerThreads();
+    cleaner.onConfigurationChange(newConf);
+    List<Thread> newThreads = cleaner.getCleanerThreads();
+    Assert.assertArrayEquals(oldThreads.toArray(), newThreads.toArray());
 
     // wait until clean done and check
     t.join();