You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2018/07/23 03:37:35 UTC
hbase git commit: HBASE-20559 Backport HBASE-18083 (Make large/small
file clean thread number configurable in HFileCleaner) to branch-1
Repository: hbase
Updated Branches:
refs/heads/branch-1 780724b15 -> 896b69a0f
HBASE-20559 Backport HBASE-18083 (Make large/small file clean thread number configurable in HFileCleaner) to branch-1
The last port commit of HBASE-20555
Signed-off-by: Yu Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/896b69a0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/896b69a0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/896b69a0
Branch: refs/heads/branch-1
Commit: 896b69a0fc6c3164d33a1269e0d7ac87d89b3d90
Parents: 780724b
Author: TAK LON WU <wu...@amazon.com>
Authored: Mon Jul 23 11:23:57 2018 +0800
Committer: Yu Li <li...@apache.org>
Committed: Mon Jul 23 11:36:47 2018 +0800
----------------------------------------------------------------------
.../hbase/master/cleaner/HFileCleaner.java | 154 +++++++++++++------
.../hbase/master/cleaner/TestHFileCleaner.java | 13 +-
2 files changed, 120 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/896b69a0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 70548b4..8f0b4be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,16 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
"hbase.regionserver.hfilecleaner.small.queue.size";
public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
+ // Configuration key for large file delete thread number
+ public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
+ "hbase.regionserver.hfilecleaner.large.thread.count";
+ public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
+
+ // Configuration key for small file delete thread number
+ public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
+ "hbase.regionserver.hfilecleaner.small.thread.count";
+ public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
+
private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
StealJobQueue<HFileDeleteTask> largeFileQueue;
@@ -73,11 +84,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
private int throttlePoint;
private int largeQueueInitSize;
private int smallQueueInitSize;
+ private int largeFileDeleteThreadNumber;
+ private int smallFileDeleteThreadNumber;
private List<Thread> threads = new ArrayList<Thread>();
private boolean running;
- private long deletedLargeFiles = 0L;
- private long deletedSmallFiles = 0L;
+ private AtomicLong deletedLargeFiles = new AtomicLong();
+ private AtomicLong deletedSmallFiles = new AtomicLong();
/**
* @param period the period of time to sleep between each run
@@ -99,6 +112,10 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
smallFileQueue = largeFileQueue.getStealFromQueue();
+ largeFileDeleteThreadNumber =
+ conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+ smallFileDeleteThreadNumber =
+ conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
startHFileDeleteThreads();
}
@@ -182,30 +199,34 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
final String n = Thread.currentThread().getName();
running = true;
// start thread for large file deletion
- Thread large = new Thread() {
- @Override
- public void run() {
- consumerLoop(largeFileQueue);
- }
- };
- large.setDaemon(true);
- large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis());
- large.start();
- LOG.debug("Starting hfile cleaner for large files: " + large.getName());
- threads.add(large);
+ for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
+ Thread large = new Thread() {
+ @Override
+ public void run() {
+ consumerLoop(largeFileQueue);
+ }
+ };
+ large.setDaemon(true);
+ large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
+ large.start();
+ LOG.debug("Starting hfile cleaner for large files: " + large.getName());
+ threads.add(large);
+ }
// start thread for small file deletion
- Thread small = new Thread() {
- @Override
- public void run() {
- consumerLoop(smallFileQueue);
- }
- };
- small.setDaemon(true);
- small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis());
- small.start();
- LOG.debug("Starting hfile cleaner for small files: " + small.getName());
- threads.add(small);
+ for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
+ Thread small = new Thread() {
+ @Override
+ public void run() {
+ consumerLoop(smallFileQueue);
+ }
+ };
+ small.setDaemon(true);
+ small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
+ small.start();
+ LOG.debug("Starting hfile cleaner for small files: " + small.getName());
+ threads.add(small);
+ }
}
protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
@@ -247,20 +268,20 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
// Currently only for testing purpose
private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
if (isLargeFile) {
- if (deletedLargeFiles == Long.MAX_VALUE) {
+ if (deletedLargeFiles.get() == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
- deletedLargeFiles = 0L;
+ deletedLargeFiles.set(0L);
}
- deletedLargeFiles++;
+ deletedLargeFiles.incrementAndGet();
} else {
- if (deletedSmallFiles == Long.MAX_VALUE) {
+ if (deletedSmallFiles.get() == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
- deletedSmallFiles = 0L;
+ deletedSmallFiles.set(0L);
}
if (fromLargeQueue && LOG.isTraceEnabled()) {
LOG.trace("Stolen a small file deletion task in large file thread");
}
- deletedSmallFiles++;
+ deletedSmallFiles.incrementAndGet();
}
}
@@ -353,12 +374,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
@VisibleForTesting
public long getNumOfDeletedLargeFiles() {
- return deletedLargeFiles;
+ return deletedLargeFiles.get();
}
@VisibleForTesting
public long getNumOfDeletedSmallFiles() {
- return deletedSmallFiles;
+ return deletedSmallFiles.get();
}
@VisibleForTesting
@@ -378,19 +399,14 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
@Override
public void onConfigurationChange(Configuration conf) {
- StringBuilder builder = new StringBuilder();
- builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
- .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
- .append(", smallQueueInitSize: ").append(smallQueueInitSize);
+ if (!checkAndUpdateConfigurations(conf)) {
+ LOG.debug("Update configuration triggered but nothing changed for this cleaner");
+ return;
+ }
stopHFileDeleteThreads();
- this.throttlePoint =
- conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
- this.largeQueueInitSize =
- conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
- this.smallQueueInitSize =
- conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
// record the left over tasks
- List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
+ List<HFileDeleteTask> leftOverTasks =
+ new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
for (HFileDeleteTask task : largeFileQueue) {
leftOverTasks.add(task);
}
@@ -400,13 +416,59 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
smallFileQueue = largeFileQueue.getStealFromQueue();
threads.clear();
- builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
- .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
- LOG.debug(builder.toString());
startHFileDeleteThreads();
// re-dispatch the left over tasks
for (HFileDeleteTask task : leftOverTasks) {
dispatch(task);
}
}
+
+ /**
+ * Check new configuration and update settings if value changed
+ * @param conf The new configuration
+ * @return true if any configuration for HFileCleaner changes, false if no change
+ */
+ private boolean checkAndUpdateConfigurations(Configuration conf) {
+ boolean updated = false;
+ int throttlePoint =
+ conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+ if (throttlePoint != this.throttlePoint) {
+ LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " + throttlePoint);
+ this.throttlePoint = throttlePoint;
+ updated = true;
+ }
+ int largeQueueInitSize =
+ conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+ if (largeQueueInitSize != this.largeQueueInitSize) {
+ LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize + " to "
+ + largeQueueInitSize);
+ this.largeQueueInitSize = largeQueueInitSize;
+ updated = true;
+ }
+ int smallQueueInitSize =
+ conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+ if (smallQueueInitSize != this.smallQueueInitSize) {
+ LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize + " to "
+ + smallQueueInitSize);
+ this.smallQueueInitSize = smallQueueInitSize;
+ updated = true;
+ }
+ int largeFileDeleteThreadNumber =
+ conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+ if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
+ LOG.debug("Updating largeFileDeleteThreadNumber, from " + this.largeFileDeleteThreadNumber
+ + " to " + largeFileDeleteThreadNumber);
+ this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
+ updated = true;
+ }
+ int smallFileDeleteThreadNumber =
+ conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
+ if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
+ LOG.debug("Updating smallFileDeleteThreadNumber, from " + this.smallFileDeleteThreadNumber
+ + " to " + smallFileDeleteThreadNumber);
+ this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
+ updated = true;
+ }
+ return updated;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/896b69a0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 18afafa..249780b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -321,6 +322,8 @@ public class TestHFileCleaner {
final int UPDATE_QUEUE_INIT_SIZE = 1024;
final int LARGE_FILE_NUM = 5;
final int SMALL_FILE_NUM = 20;
+ final int LARGE_THREAD_NUM = 2;
+ final int SMALL_THREAD_NUM = 4;
Configuration conf = UTIL.getConfiguration();
// no cleaner policies = delete all files
@@ -363,6 +366,8 @@ public class TestHFileCleaner {
newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
+ newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM);
+ newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM);
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
cleaner.onConfigurationChange(newConf);
@@ -371,7 +376,13 @@ public class TestHFileCleaner {
Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
- Assert.assertEquals(2, cleaner.getCleanerThreads().size());
+ Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size());
+
+ // make sure no cost when onConfigurationChange called with no change
+ List<Thread> oldThreads = cleaner.getCleanerThreads();
+ cleaner.onConfigurationChange(newConf);
+ List<Thread> newThreads = cleaner.getCleanerThreads();
+ Assert.assertArrayEquals(oldThreads.toArray(), newThreads.toArray());
// wait until clean done and check
t.join();