You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2017/04/01 03:00:10 UTC
hbase git commit: HBASE-17215 Separate small/large file delete
threads in HFileCleaner to accelerate archived hfile cleanup speed
Repository: hbase
Updated Branches:
refs/heads/master 5f98ad205 -> 9facfa550
HBASE-17215 Separate small/large file delete threads in HFileCleaner to accelerate archived hfile cleanup speed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9facfa55
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9facfa55
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9facfa55
Branch: refs/heads/master
Commit: 9facfa550f1e7386be3a04d84f7e8013f5002965
Parents: 5f98ad2
Author: Yu Li <li...@apache.org>
Authored: Sat Apr 1 10:59:11 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Sat Apr 1 10:59:11 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/master/HMaster.java | 1 +
.../hbase/master/cleaner/CleanerChore.java | 14 +-
.../hbase/master/cleaner/HFileCleaner.java | 313 ++++++++++++++++++-
.../hbase/regionserver/RSRpcServices.java | 3 +-
.../hbase/master/cleaner/TestHFileCleaner.java | 152 +++++++++
5 files changed, 478 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a1cbe53..bb9f282 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -885,6 +885,7 @@ public class HMaster extends HRegionServer implements MasterServices {
status.markComplete("Initialization successful");
LOG.info("Master has completed initialization");
configurationManager.registerObserver(this.balancer);
+ configurationManager.registerObserver(this.hfileCleaner);
// Set master as 'initialized'.
setInitialized(true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index dddad36..825feba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -46,7 +46,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
- private final FileSystem fs;
+ protected final FileSystem fs;
private final Path oldFileDir;
private final Configuration conf;
protected List<T> cleanersChain;
@@ -269,6 +269,15 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
+ return deleteFiles(filesToDelete) == files.size();
+ }
+
+ /**
+ * Delete the given files
+ * @param filesToDelete files to delete
+ * @return number of deleted files
+ */
+ protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
int deletedFileCount = 0;
for (FileStatus file : filesToDelete) {
Path filePath = file.getPath();
@@ -289,8 +298,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
LOG.warn("Error while deleting: " + filePath, e);
}
}
-
- return deletedFileCount == files.size();
+ return deletedFileCount;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 89c316b..3a68252 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -17,22 +17,33 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* This Chore, every time it runs, will clear the HFiles in the hfile archive
* folder that are deletable for each HFile cleaner in the chain.
*/
@InterfaceAudience.Private
-public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
+public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> implements
+ ConfigurationObserver {
public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
@@ -41,6 +52,34 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
this(period, stopper, conf, fs, directory, null);
}
+ // Configuration key for large/small throttle point
+ public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
+ "hbase.regionserver.thread.hfilecleaner.throttle";
+ public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
+
+ // Configuration key for large queue size
+ public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
+ "hbase.regionserver.hfilecleaner.large.queue.size";
+ public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
+
+ // Configuration key for small queue size
+ public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
+ "hbase.regionserver.hfilecleaner.small.queue.size";
+ public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
+
+ private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
+
+ BlockingQueue<HFileDeleteTask> largeFileQueue;
+ BlockingQueue<HFileDeleteTask> smallFileQueue;
+ private int throttlePoint;
+ private int largeQueueSize;
+ private int smallQueueSize;
+ private List<Thread> threads = new ArrayList<Thread>();
+ private boolean running;
+
+ private long deletedLargeFiles = 0L;
+ private long deletedSmallFiles = 0L;
+
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
@@ -53,6 +92,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
Path directory, Map<String, Object> params) {
super("HFileCleaner", period, stopper, conf, fs,
directory, MASTER_HFILE_CLEANER_PLUGINS, params);
+ throttlePoint =
+ conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+ largeQueueSize =
+ conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
+ smallQueueSize =
+ conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+ largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
+ smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+ startHFileDeleteThreads();
}
@Override
@@ -69,4 +117,267 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
return this.cleanersChain;
}
+
+ @Override
+ public int deleteFiles(Iterable<FileStatus> filesToDelete) {
+ int deletedFiles = 0;
+ List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
+ // construct delete tasks and add into relative queue
+ for (FileStatus file : filesToDelete) {
+ HFileDeleteTask task = deleteFile(file);
+ if (task != null) {
+ tasks.add(task);
+ }
+ }
+ // wait for each submitted task to finish
+ for (HFileDeleteTask task : tasks) {
+ if (task.getResult()) {
+ deletedFiles++;
+ }
+ }
+ return deletedFiles;
+ }
+
+ /**
+ * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
+ * @param file the file to delete
+ * @return HFileDeleteTask to track progress
+ */
+ private HFileDeleteTask deleteFile(FileStatus file) {
+ HFileDeleteTask task = new HFileDeleteTask(file);
+ boolean enqueued = dispatch(task);
+ return enqueued ? task : null;
+ }
+
+ private boolean dispatch(HFileDeleteTask task) {
+ if (task.fileLength >= this.throttlePoint) {
+ if (!this.largeFileQueue.offer(task)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Large file deletion queue is full");
+ }
+ return false;
+ }
+ } else {
+ if (!this.smallFileQueue.offer(task)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Small file deletion queue is full");
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ stopHFileDeleteThreads();
+ }
+
+ /**
+ * Start threads for hfile deletion
+ */
+ private void startHFileDeleteThreads() {
+ final String n = Thread.currentThread().getName();
+ running = true;
+ // start thread for large file deletion
+ Thread large = new Thread() {
+ @Override
+ public void run() {
+ consumerLoop(largeFileQueue);
+ }
+ };
+ large.setDaemon(true);
+ large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis());
+ large.start();
+ LOG.debug("Starting hfile cleaner for large files: " + large.getName());
+ threads.add(large);
+
+ // start thread for small file deletion
+ Thread small = new Thread() {
+ @Override
+ public void run() {
+ consumerLoop(smallFileQueue);
+ }
+ };
+ small.setDaemon(true);
+ small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis());
+ small.start();
+ LOG.debug("Starting hfile cleaner for small files: " + small.getName());
+ threads.add(small);
+ }
+
+ protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
+ try {
+ while (running) {
+ HFileDeleteTask task = null;
+ try {
+ task = queue.take();
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted while trying to take a task from queue", e);
+ }
+ break;
+ }
+ if (task != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing: " + task.filePath + " from archive");
+ }
+ boolean succeed;
+ try {
+ succeed = this.fs.delete(task.filePath, false);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete file " + task.filePath, e);
+ succeed = false;
+ }
+ task.setResult(succeed);
+ if (succeed) {
+ countDeletedFiles(queue == largeFileQueue);
+ }
+ }
+ }
+ } finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exit thread: " + Thread.currentThread());
+ }
+ }
+ }
+
+ // Currently only for testing purpose
+ private void countDeletedFiles(boolean isLarge) {
+ if (isLarge) {
+ if (deletedLargeFiles == Long.MAX_VALUE) {
+ LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
+ deletedLargeFiles = 0L;
+ }
+ deletedLargeFiles++;
+ } else {
+ if (deletedSmallFiles == Long.MAX_VALUE) {
+ LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
+ deletedSmallFiles = 0L;
+ }
+ deletedSmallFiles++;
+ }
+ }
+
+ /**
+ * Stop threads for hfile deletion
+ */
+ private void stopHFileDeleteThreads() {
+ running = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping file delete threads");
+ }
+ for(Thread thread: threads){
+ thread.interrupt();
+ }
+ }
+
+ static class HFileDeleteTask {
+ private static final long MAX_WAIT = 60 * 1000L;
+ private static final long WAIT_UNIT = 1000L;
+
+ boolean done = false;
+ boolean result;
+ final Path filePath;
+ final long fileLength;
+
+ public HFileDeleteTask(FileStatus file) {
+ this.filePath = file.getPath();
+ this.fileLength = file.getLen();
+ }
+
+ public synchronized void setResult(boolean result) {
+ this.done = true;
+ this.result = result;
+ notify();
+ }
+
+ public synchronized boolean getResult() {
+ long waitTime = 0;
+ try {
+ while (!done) {
+ wait(WAIT_UNIT);
+ waitTime += WAIT_UNIT;
+ if (done) {
+ return this.result;
+ }
+ if (waitTime > MAX_WAIT) {
+ LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath
+ + ", exit...");
+ return false;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for result of deleting " + filePath
+ + ", will return false", e);
+ return false;
+ }
+ return this.result;
+ }
+ }
+
+ @VisibleForTesting
+ public List<Thread> getCleanerThreads() {
+ return threads;
+ }
+
+ @VisibleForTesting
+ public long getNumOfDeletedLargeFiles() {
+ return deletedLargeFiles;
+ }
+
+ @VisibleForTesting
+ public long getNumOfDeletedSmallFiles() {
+ return deletedSmallFiles;
+ }
+
+ @VisibleForTesting
+ public long getLargeQueueSize() {
+ return largeQueueSize;
+ }
+
+ @VisibleForTesting
+ public long getSmallQueueSize() {
+ return smallQueueSize;
+ }
+
+ @VisibleForTesting
+ public long getThrottlePoint() {
+ return throttlePoint;
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
+ .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize)
+ .append(", smallQueueSize: ").append(smallQueueSize);
+ stopHFileDeleteThreads();
+ this.throttlePoint =
+ conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+ this.largeQueueSize =
+ conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
+ this.smallQueueSize =
+ conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+ // record the left over tasks
+ List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
+ for (HFileDeleteTask task : largeFileQueue) {
+ leftOverTasks.add(task);
+ }
+ for (HFileDeleteTask task : smallFileQueue) {
+ leftOverTasks.add(task);
+ }
+ largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
+ smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+ threads.clear();
+ builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ")
+ .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize);
+ LOG.debug(builder.toString());
+ startHFileDeleteThreads();
+ // re-dispatch the left over tasks
+ for (HFileDeleteTask task : leftOverTasks) {
+ dispatch(task);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 298f538..8d4ea4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1295,7 +1295,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new RegionServerStoppedException("File system not available");
}
if (!regionServer.isOnline()) {
- throw new ServerNotRunningYetException("Server is not running yet");
+ throw new ServerNotRunningYetException("Server " + regionServer.serverName
+ + " is not running yet");
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 6049701..8e8a4dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -260,4 +263,153 @@ public class TestHFileCleaner {
return null;
}
}
+
+ @Test
+ public void testThreadCleanup() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
+ Server server = new DummyServer();
+ Path archivedHfileDir =
+ new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+ // setup the cleaner
+ FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+ HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
+ // clean up archive directory
+ fs.delete(archivedHfileDir, true);
+ fs.mkdirs(archivedHfileDir);
+ // create some file to delete
+ fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd"));
+ // launch the chore
+ cleaner.chore();
+ // call cleanup
+ cleaner.cleanup();
+ // wait awhile for thread to die
+ Thread.sleep(100);
+ for (Thread thread : cleaner.getCleanerThreads()) {
+ Assert.assertFalse(thread.isAlive());
+ }
+ }
+
+ @Test
+ public void testLargeSmallIsolation() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ // no cleaner policies = delete all files
+ conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
+ conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 512 * 1024);
+ Server server = new DummyServer();
+ Path archivedHfileDir =
+ new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+ // setup the cleaner
+ FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+ HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
+ // clean up archive directory
+ fs.delete(archivedHfileDir, true);
+ fs.mkdirs(archivedHfileDir);
+ // necessary set up
+ final int LARGE_FILE_NUM = 5;
+ final int SMALL_FILE_NUM = 20;
+ createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
+ // call cleanup
+ cleaner.chore();
+
+ Assert.assertEquals(LARGE_FILE_NUM, cleaner.getNumOfDeletedLargeFiles());
+ Assert.assertEquals(SMALL_FILE_NUM, cleaner.getNumOfDeletedSmallFiles());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testOnConfigurationChange() throws Exception {
+ // constants
+ final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
+ final int ORIGINAL_QUEUE_SIZE = 512;
+ final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check
+ final int UPDATE_QUEUE_SIZE = 1024;
+ final int LARGE_FILE_NUM = 5;
+ final int SMALL_FILE_NUM = 20;
+
+ Configuration conf = UTIL.getConfiguration();
+ // no cleaner policies = delete all files
+ conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
+ conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
+ conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+ conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+ Server server = new DummyServer();
+ Path archivedHfileDir =
+ new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+ // setup the cleaner
+ FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+ final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
+ Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
+ Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
+ Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
+
+ // clean up archive directory and create files for testing
+ fs.delete(archivedHfileDir, true);
+ fs.mkdirs(archivedHfileDir);
+ createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
+
+ // call cleaner, run as daemon to test the interrupt-at-middle case
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ cleaner.chore();
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ // let the cleaner run for some while
+ Thread.sleep(20);
+
+ // trigger configuration change
+ Configuration newConf = new Configuration(conf);
+ newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
+ newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
+ newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
+ cleaner.onConfigurationChange(newConf);
+ LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+
+ // check values after change
+ Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
+ Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
+ Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
+ Assert.assertEquals(2, cleaner.getCleanerThreads().size());
+
+ // wait until clean done and check
+ t.join();
+ LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+ Assert.assertTrue("Should delete more than " + LARGE_FILE_NUM
+ + " files from large queue but actually " + cleaner.getNumOfDeletedLargeFiles(),
+ cleaner.getNumOfDeletedLargeFiles() > LARGE_FILE_NUM);
+ Assert.assertTrue("Should delete less than " + SMALL_FILE_NUM
+ + " files from small queue but actually " + cleaner.getNumOfDeletedSmallFiles(),
+ cleaner.getNumOfDeletedSmallFiles() < SMALL_FILE_NUM);
+ }
+
+ private void createFilesForTesting(int largeFileNum, int smallFileNum, FileSystem fs,
+ Path archivedHfileDir) throws IOException {
+ final Random rand = new Random();
+ final byte[] large = new byte[1024 * 1024];
+ for (int i = 0; i < large.length; i++) {
+ large[i] = (byte) rand.nextInt(128);
+ }
+ final byte[] small = new byte[1024];
+ for (int i = 0; i < small.length; i++) {
+ small[i] = (byte) rand.nextInt(128);
+ }
+ // create large and small files
+ for (int i = 1; i <= largeFileNum; i++) {
+ FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "large-file-" + i));
+ out.write(large);
+ out.close();
+ }
+ for (int i = 1; i <= smallFileNum; i++) {
+ FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "small-file-" + i));
+ out.write(small);
+ out.close();
+ }
+ }
}