You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/04/11 17:31:12 UTC
[07/54] [abbrv] hbase git commit: HBASE-17854 Use StealJobQueue in
HFileCleaner after HBASE-17215
HBASE-17854 Use StealJobQueue in HFileCleaner after HBASE-17215
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cbcbcf4d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cbcbcf4d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cbcbcf4d
Branch: refs/heads/HBASE-16961
Commit: cbcbcf4dcd3401327cc36173f3ca8e5362da1e0c
Parents: a66d491
Author: Yu Li <li...@apache.org>
Authored: Wed Apr 5 17:53:21 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Apr 5 17:53:21 2017 +0800
----------------------------------------------------------------------
.../hbase/master/cleaner/HFileCleaner.java | 98 +++++++++++++-------
.../apache/hadoop/hbase/util/StealJobQueue.java | 22 +++++
.../hbase/master/cleaner/TestHFileCleaner.java | 28 +++---
.../hadoop/hbase/util/TestStealJobQueue.java | 2 +-
4 files changed, 102 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 3a68252..8b3515a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.StealJobQueue;
import com.google.common.annotations.VisibleForTesting;
/**
@@ -57,23 +57,23 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
"hbase.regionserver.thread.hfilecleaner.throttle";
public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
- // Configuration key for large queue size
- public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
+ // Configuration key for large queue initial size
+ public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
"hbase.regionserver.hfilecleaner.large.queue.size";
- public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
+ public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
- // Configuration key for small queue size
- public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
+ // Configuration key for small queue initial size
+ public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
"hbase.regionserver.hfilecleaner.small.queue.size";
- public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
+ public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
- BlockingQueue<HFileDeleteTask> largeFileQueue;
+ StealJobQueue<HFileDeleteTask> largeFileQueue;
BlockingQueue<HFileDeleteTask> smallFileQueue;
private int throttlePoint;
- private int largeQueueSize;
- private int smallQueueSize;
+ private int largeQueueInitSize;
+ private int smallQueueInitSize;
private List<Thread> threads = new ArrayList<Thread>();
private boolean running;
@@ -94,12 +94,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
directory, MASTER_HFILE_CLEANER_PLUGINS, params);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
- largeQueueSize =
- conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
- smallQueueSize =
- conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
- largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
- smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+ largeQueueInitSize =
+ conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+ smallQueueInitSize =
+ conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+ largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+ smallFileQueue = largeFileQueue.getStealFromQueue();
startHFileDeleteThreads();
}
@@ -152,6 +152,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
private boolean dispatch(HFileDeleteTask task) {
if (task.fileLength >= this.throttlePoint) {
if (!this.largeFileQueue.offer(task)) {
+ // should never arrive here as long as we use PriorityQueue
if (LOG.isTraceEnabled()) {
LOG.trace("Large file deletion queue is full");
}
@@ -159,6 +160,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
} else {
if (!this.smallFileQueue.offer(task)) {
+ // should never arrive here as long as we use PriorityQueue
if (LOG.isTraceEnabled()) {
LOG.trace("Small file deletion queue is full");
}
@@ -232,7 +234,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
task.setResult(succeed);
if (succeed) {
- countDeletedFiles(queue == largeFileQueue);
+ countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
}
}
}
@@ -244,8 +246,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
// Currently only for testing purpose
- private void countDeletedFiles(boolean isLarge) {
- if (isLarge) {
+ private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
+ if (isLargeFile) {
if (deletedLargeFiles == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
deletedLargeFiles = 0L;
@@ -256,6 +258,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
deletedSmallFiles = 0L;
}
+ if (fromLargeQueue && LOG.isTraceEnabled()) {
+ LOG.trace("Stolen a small file deletion task in large file thread");
+ }
deletedSmallFiles++;
}
}
@@ -273,7 +278,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
}
- static class HFileDeleteTask {
+ static class HFileDeleteTask implements Comparable<HFileDeleteTask> {
private static final long MAX_WAIT = 60 * 1000L;
private static final long WAIT_UNIT = 1000L;
@@ -315,6 +320,31 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
return this.result;
}
+
+ @Override
+ public int compareTo(HFileDeleteTask o) {
+ long sub = this.fileLength - o.fileLength;
+ // smaller value with higher priority in PriorityQueue, and we intent to delete the larger
+ // file first.
+ return (sub > 0) ? -1 : (sub < 0 ? 1 : 0);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof HFileDeleteTask)) {
+ return false;
+ }
+ HFileDeleteTask otherTask = (HFileDeleteTask) o;
+ return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
+ }
+
+ @Override
+ public int hashCode() {
+ return filePath.hashCode();
+ }
}
@VisibleForTesting
@@ -333,13 +363,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
@VisibleForTesting
- public long getLargeQueueSize() {
- return largeQueueSize;
+ public long getLargeQueueInitSize() {
+ return largeQueueInitSize;
}
@VisibleForTesting
- public long getSmallQueueSize() {
- return smallQueueSize;
+ public long getSmallQueueInitSize() {
+ return smallQueueInitSize;
}
@VisibleForTesting
@@ -351,15 +381,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
public void onConfigurationChange(Configuration conf) {
StringBuilder builder = new StringBuilder();
builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
- .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize)
- .append(", smallQueueSize: ").append(smallQueueSize);
+ .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
+ .append(", smallQueueInitSize: ").append(smallQueueInitSize);
stopHFileDeleteThreads();
this.throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
- this.largeQueueSize =
- conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
- this.smallQueueSize =
- conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+ this.largeQueueInitSize =
+ conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+ this.smallQueueInitSize =
+ conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
// record the left over tasks
List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
for (HFileDeleteTask task : largeFileQueue) {
@@ -368,11 +398,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
for (HFileDeleteTask task : smallFileQueue) {
leftOverTasks.add(task);
}
- largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
- smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+ largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+ smallFileQueue = largeFileQueue.getStealFromQueue();
threads.clear();
- builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ")
- .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize);
+ builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
+ .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
LOG.debug(builder.toString());
startHFileDeleteThreads();
// re-dispatch the left over tasks
http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
index 74f0747..5e7e232 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
@@ -48,6 +48,24 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
public StealJobQueue() {
this.stealFromQueue = new PriorityBlockingQueue<T>() {
+
+ @Override
+ public boolean offer(T t) {
+ lock.lock();
+ try {
+ notEmpty.signal();
+ return super.offer(t);
+ } finally {
+ lock.unlock();
+ }
+ }
+ };
+ }
+
+ public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
+ super(initCapacity);
+ this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) {
+
@Override
public boolean offer(T t) {
lock.lock();
@@ -61,6 +79,10 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
};
}
+ /**
+ * Get a queue whose job might be stolen by the consumer of this original queue
+ * @return the queue whose job could be stolen
+ */
public BlockingQueue<T> getStealFromQueue() {
return stealFromQueue;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 8e8a4dd..5f05488 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -322,9 +322,9 @@ public class TestHFileCleaner {
public void testOnConfigurationChange() throws Exception {
// constants
final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
- final int ORIGINAL_QUEUE_SIZE = 512;
+ final int ORIGINAL_QUEUE_INIT_SIZE = 512;
final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check
- final int UPDATE_QUEUE_SIZE = 1024;
+ final int UPDATE_QUEUE_INIT_SIZE = 1024;
final int LARGE_FILE_NUM = 5;
final int SMALL_FILE_NUM = 20;
@@ -332,8 +332,8 @@ public class TestHFileCleaner {
// no cleaner policies = delete all files
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
- conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
- conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+ conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
+ conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
Server server = new DummyServer();
Path archivedHfileDir =
new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
@@ -342,8 +342,8 @@ public class TestHFileCleaner {
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
- Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
- Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
+ Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
+ Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
// clean up archive directory and create files for testing
fs.delete(archivedHfileDir, true);
@@ -359,22 +359,24 @@ public class TestHFileCleaner {
};
t.setDaemon(true);
t.start();
- // let the cleaner run for some while
- Thread.sleep(20);
+ // wait until file clean started
+ while (cleaner.getNumOfDeletedSmallFiles() == 0) {
+ Thread.yield();
+ }
// trigger configuration change
Configuration newConf = new Configuration(conf);
newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
- newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
- newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
- cleaner.onConfigurationChange(newConf);
+ newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
+ newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+ cleaner.onConfigurationChange(newConf);
// check values after change
Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
- Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
- Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
+ Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
+ Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
Assert.assertEquals(2, cleaner.getCleanerThreads().size());
// wait until clean done and check
http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
index b35f6f4..54fdaca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.*;
public class TestStealJobQueue {
StealJobQueue<Integer> stealJobQueue;
- BlockingQueue stealFromQueue;
+ BlockingQueue<Integer> stealFromQueue;
@Before
public void setup() {