You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/12/14 00:15:04 UTC
[36/50] [abbrv] hbase git commit: HBASE-20558 Port HBASE-17854 (Use
StealJobQueue in HFileCleaner after HBASE-17215) to branch-1
HBASE-20558 Port HBASE-17854 (Use StealJobQueue in HFileCleaner after HBASE-17215) to branch-1
The third port commit of HBASE-20555
Signed-off-by: Zach York <zy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/976f07e8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/976f07e8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/976f07e8
Branch: refs/heads/branch-1.3
Commit: 976f07e87cbeb35b80a7ca8eea46f973e932bf83
Parents: 30b1dc0
Author: TAK LON WU <wu...@amazon.com>
Authored: Mon Jul 9 16:34:06 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 12 18:08:20 2018 -0800
----------------------------------------------------------------------
.../hbase/master/cleaner/HFileCleaner.java | 98 +++++++++++++-------
.../apache/hadoop/hbase/util/StealJobQueue.java | 29 +++++-
.../hbase/master/cleaner/TestHFileCleaner.java | 28 +++---
.../hadoop/hbase/util/TestStealJobQueue.java | 14 +--
4 files changed, 113 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index defe851..70548b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.StealJobQueue;
/**
* This Chore, every time it runs, will clear the HFiles in the hfile archive
@@ -56,23 +56,23 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
"hbase.regionserver.thread.hfilecleaner.throttle";
public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
- // Configuration key for large queue size
- public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
+ // Configuration key for large queue initial size
+ public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
"hbase.regionserver.hfilecleaner.large.queue.size";
- public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
+ public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
- // Configuration key for small queue size
- public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
+ // Configuration key for small queue initial size
+ public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
"hbase.regionserver.hfilecleaner.small.queue.size";
- public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
+ public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
- BlockingQueue<HFileDeleteTask> largeFileQueue;
+ StealJobQueue<HFileDeleteTask> largeFileQueue;
BlockingQueue<HFileDeleteTask> smallFileQueue;
private int throttlePoint;
- private int largeQueueSize;
- private int smallQueueSize;
+ private int largeQueueInitSize;
+ private int smallQueueInitSize;
private List<Thread> threads = new ArrayList<Thread>();
private boolean running;
@@ -93,12 +93,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
directory, MASTER_HFILE_CLEANER_PLUGINS, params);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
- largeQueueSize =
- conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
- smallQueueSize =
- conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
- largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
- smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+ largeQueueInitSize =
+ conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+ smallQueueInitSize =
+ conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+ largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+ smallFileQueue = largeFileQueue.getStealFromQueue();
startHFileDeleteThreads();
}
@@ -151,6 +151,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
private boolean dispatch(HFileDeleteTask task) {
if (task.fileLength >= this.throttlePoint) {
if (!this.largeFileQueue.offer(task)) {
+ // should never arrive here as long as we use PriorityQueue
if (LOG.isTraceEnabled()) {
LOG.trace("Large file deletion queue is full");
}
@@ -158,6 +159,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
} else {
if (!this.smallFileQueue.offer(task)) {
+ // should never arrive here as long as we use PriorityQueue
if (LOG.isTraceEnabled()) {
LOG.trace("Small file deletion queue is full");
}
@@ -231,7 +233,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
task.setResult(succeed);
if (succeed) {
- countDeletedFiles(queue == largeFileQueue);
+ countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
}
}
}
@@ -243,8 +245,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
// Currently only for testing purpose
- private void countDeletedFiles(boolean isLarge) {
- if (isLarge) {
+ private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
+ if (isLargeFile) {
if (deletedLargeFiles == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
deletedLargeFiles = 0L;
@@ -255,6 +257,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
deletedSmallFiles = 0L;
}
+ if (fromLargeQueue && LOG.isTraceEnabled()) {
+ LOG.trace("Stolen a small file deletion task in large file thread");
+ }
deletedSmallFiles++;
}
}
@@ -272,7 +277,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
}
- static class HFileDeleteTask {
+ static class HFileDeleteTask implements Comparable<HFileDeleteTask> {
private static final long MAX_WAIT = 60 * 1000L;
private static final long WAIT_UNIT = 1000L;
@@ -314,6 +319,31 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
return this.result;
}
+
+ @Override
+ public int compareTo(HFileDeleteTask o) {
+ long sub = this.fileLength - o.fileLength;
+ // smaller value with higher priority in PriorityQueue, and we intent to delete the larger
+ // file first.
+ return (sub > 0) ? -1 : (sub < 0 ? 1 : 0);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof HFileDeleteTask)) {
+ return false;
+ }
+ HFileDeleteTask otherTask = (HFileDeleteTask) o;
+ return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
+ }
+
+ @Override
+ public int hashCode() {
+ return filePath.hashCode();
+ }
}
@VisibleForTesting
@@ -332,13 +362,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
@VisibleForTesting
- public long getLargeQueueSize() {
- return largeQueueSize;
+ public long getLargeQueueInitSize() {
+ return largeQueueInitSize;
}
@VisibleForTesting
- public long getSmallQueueSize() {
- return smallQueueSize;
+ public long getSmallQueueInitSize() {
+ return smallQueueInitSize;
}
@VisibleForTesting
@@ -350,15 +380,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
public void onConfigurationChange(Configuration conf) {
StringBuilder builder = new StringBuilder();
builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
- .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize)
- .append(", smallQueueSize: ").append(smallQueueSize);
+ .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
+ .append(", smallQueueInitSize: ").append(smallQueueInitSize);
stopHFileDeleteThreads();
this.throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
- this.largeQueueSize =
- conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
- this.smallQueueSize =
- conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+ this.largeQueueInitSize =
+ conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+ this.smallQueueInitSize =
+ conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
// record the left over tasks
List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
for (HFileDeleteTask task : largeFileQueue) {
@@ -367,11 +397,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
for (HFileDeleteTask task : smallFileQueue) {
leftOverTasks.add(task);
}
- largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
- smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+ largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+ smallFileQueue = largeFileQueue.getStealFromQueue();
threads.clear();
- builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ")
- .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize);
+ builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
+ .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
LOG.debug(builder.toString());
startHFileDeleteThreads();
// re-dispatch the left over tasks
http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
index 74f0747..c6d8ee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.util;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -27,6 +25,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
/**
* This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor.
* This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the
@@ -48,6 +48,24 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
public StealJobQueue() {
this.stealFromQueue = new PriorityBlockingQueue<T>() {
+
+ @Override
+ public boolean offer(T t) {
+ lock.lock();
+ try {
+ notEmpty.signal();
+ return super.offer(t);
+ } finally {
+ lock.unlock();
+ }
+ }
+ };
+ }
+
+ public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
+ super(initCapacity);
+ this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) {
+
@Override
public boolean offer(T t) {
lock.lock();
@@ -61,6 +79,10 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
};
}
+ /**
+ * Get a queue whose job might be stolen by the consumer of this original queue
+ * @return the queue whose job could be stolen
+ */
public BlockingQueue<T> getStealFromQueue() {
return stealFromQueue;
}
@@ -108,8 +130,9 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
retVal = stealFromQueue.poll();
}
if (retVal == null) {
- if (nanos <= 0)
+ if (nanos <= 0) {
return null;
+ }
nanos = notEmpty.awaitNanos(nanos);
} else {
return retVal;
http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 5712729..18afafa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -316,9 +316,9 @@ public class TestHFileCleaner {
public void testOnConfigurationChange() throws Exception {
// constants
final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
- final int ORIGINAL_QUEUE_SIZE = 512;
+ final int ORIGINAL_QUEUE_INIT_SIZE = 512;
final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check
- final int UPDATE_QUEUE_SIZE = 1024;
+ final int UPDATE_QUEUE_INIT_SIZE = 1024;
final int LARGE_FILE_NUM = 5;
final int SMALL_FILE_NUM = 20;
@@ -326,8 +326,8 @@ public class TestHFileCleaner {
// no cleaner policies = delete all files
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
- conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
- conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+ conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
+ conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
Server server = new DummyServer();
Path archivedHfileDir =
new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
@@ -336,8 +336,8 @@ public class TestHFileCleaner {
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
- Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
- Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
+ Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
+ Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
// clean up archive directory and create files for testing
fs.delete(archivedHfileDir, true);
@@ -353,22 +353,24 @@ public class TestHFileCleaner {
};
t.setDaemon(true);
t.start();
- // let the cleaner run for some while
- Thread.sleep(20);
+ // wait until file clean started
+ while (cleaner.getNumOfDeletedSmallFiles() == 0) {
+ Thread.yield();
+ }
// trigger configuration change
Configuration newConf = new Configuration(conf);
newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
- newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
- newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
- cleaner.onConfigurationChange(newConf);
+ newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
+ newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+ cleaner.onConfigurationChange(newConf);
// check values after change
Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
- Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
- Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
+ Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
+ Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
Assert.assertEquals(2, cleaner.getCleanerThreads().size());
// wait until clean done and check
http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
index 59a4115..531e4f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
@@ -18,10 +18,9 @@
*/
package org.apache.hadoop.hbase.util;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -29,14 +28,17 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.junit.Assert.*;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestStealJobQueue {
StealJobQueue<Integer> stealJobQueue;
- BlockingQueue stealFromQueue;
+ BlockingQueue<Integer> stealFromQueue;
@Before
public void setup() {