You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2017/11/23 03:25:03 UTC
hbase git commit: HBASE-18309 Support multi threads in CleanerChore
Repository: hbase
Updated Branches:
refs/heads/master cdc2bb17f -> 2442cbb6a
HBASE-18309 Support multi threads in CleanerChore
Signed-off-by: Yu Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2442cbb6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2442cbb6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2442cbb6
Branch: refs/heads/master
Commit: 2442cbb6ab5b9f1729b74361dd2bbb066d5910bd
Parents: cdc2bb1
Author: Reid Chan <re...@outlook.com>
Authored: Thu Nov 23 11:21:12 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Thu Nov 23 11:23:26 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/master/HMaster.java | 1 +
.../hbase/master/cleaner/CleanerChore.java | 335 +++++++++++++------
.../hbase/master/cleaner/HFileCleaner.java | 8 +-
.../hadoop/hbase/master/cleaner/LogCleaner.java | 168 +++++++++-
.../hbase/master/cleaner/TestCleanerChore.java | 61 +++-
.../hbase/master/cleaner/TestLogsCleaner.java | 51 +++
6 files changed, 514 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2442cbb6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 97982b9..cfbddfc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -908,6 +908,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterFinishedInitializationTime = System.currentTimeMillis();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.hfileCleaner);
+ configurationManager.registerObserver(this.logCleaner);
// Set master as 'initialized'.
setInitialized(true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2442cbb6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index b8ca1ec..582df84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -17,18 +17,22 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -46,16 +50,33 @@ import org.apache.hadoop.ipc.RemoteException;
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/
-public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
+public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
+ implements ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
+ private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * If it is an integer and >= 1, it would be the size;
+ * if 0.0 < size <= 1.0, size would be available processors * size.
+ * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
+ * while latter will use only 1 thread for chore to scan dir.
+ */
+ public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
+ private static final String DEFAULT_CHORE_POOL_SIZE = "0.5";
+
+ // It may be waste resources for each cleaner chore own its pool,
+ // so let's make pool for all cleaner chores.
+ private static volatile ForkJoinPool chorePool;
+ private static volatile int chorePoolSize;
protected final FileSystem fs;
private final Path oldFileDir;
private final Configuration conf;
+ protected final Map<String, Object> params;
+ private final AtomicBoolean enabled = new AtomicBoolean(true);
+ private final AtomicBoolean reconfig = new AtomicBoolean(false);
protected List<T> cleanersChain;
- protected Map<String, Object> params;
- private AtomicBoolean enabled = new AtomicBoolean(true);
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
@@ -80,8 +101,42 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
this.conf = conf;
this.params = params;
initCleanerChain(confKey);
+
+ if (chorePool == null) {
+ String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
+ chorePoolSize = calculatePoolSize(poolSize);
+ // poolSize may be 0 or 0.0 from a careless configuration,
+ // double check to make sure.
+ chorePoolSize = chorePoolSize == 0 ?
+ calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : chorePoolSize;
+ this.chorePool = new ForkJoinPool(chorePoolSize);
+ LOG.info("Cleaner pool size is " + chorePoolSize);
+ }
}
+ /**
+ * Calculate size for cleaner pool.
+ * @param poolSize size from configuration
+ * @return size of pool after calculation
+ */
+ private int calculatePoolSize(String poolSize) {
+ if (poolSize.matches("[1-9][0-9]*")) {
+ // If poolSize is an integer, return it directly,
+ // but upmost to the number of available processors.
+ int size = Math.min(Integer.valueOf(poolSize), AVAIL_PROCESSORS);
+ if (size == AVAIL_PROCESSORS) {
+ LOG.warn("Use full core processors to scan dir");
+ }
+ return size;
+ } else if (poolSize.matches("0.[0-9]+|1.0")) {
+ // if poolSize is a double, return poolSize * availableProcessors;
+ return (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
+ } else {
+ LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
+ ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
+ return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
+ }
+ }
/**
* Validate the file to see if it even belongs in the directory. If it is valid, then the file
@@ -109,6 +164,33 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
}
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
+ if (updatedSize == chorePoolSize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Size from configuration is the same as previous which is " +
+ updatedSize + ", no need to update.");
+ }
+ return;
+ }
+ chorePoolSize = updatedSize;
+ if (chorePool.getPoolSize() == 0) {
+ // Chore does not work now, update it directly.
+ updateChorePoolSize(updatedSize);
+ return;
+ }
+ // Chore is working, update it after chore finished.
+ reconfig.set(true);
+ }
+
+ private void updateChorePoolSize(int updatedSize) {
+ chorePool.shutdownNow();
+ LOG.info("Update chore's pool size from " +
+ chorePool.getParallelism() + " to " + updatedSize);
+ chorePool = new ForkJoinPool(updatedSize);
+ }
+
/**
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
* LogCleanerDelegate.
@@ -135,7 +217,17 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@Override
protected void chore() {
if (getEnabled()) {
- runCleaner();
+ if (runCleaner()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleaned old files/dirs under " + oldFileDir + " successfully.");
+ }
+ } else {
+ LOG.warn("Failed to fully clean old files/dirs under " + oldFileDir + ".");
+ }
+ // After each clean chore, checks if receives reconfigure notification while cleaning
+ if (reconfig.compareAndSet(true, false)) {
+ updateChorePoolSize(chorePoolSize);
+ }
} else {
LOG.debug("Cleaner chore disabled! Not cleaning.");
}
@@ -147,16 +239,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
public Boolean runCleaner() {
preRunCleaner();
- try {
- FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
- checkAndDeleteEntries(files);
- } catch (IOException e) {
- e = e instanceof RemoteException ?
- ((RemoteException)e).unwrapRemoteException() : e;
- LOG.warn("Error while cleaning the logs", e);
- return false;
- }
- return true;
+ CleanerTask task = new CleanerTask(this.oldFileDir, true);
+ chorePool.submit(task);
+ return task.join();
}
/**
@@ -199,95 +284,16 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
/**
- * Loop over the given directory entries, and check whether they can be deleted.
- * If an entry is itself a directory it will be recursively checked and deleted itself iff
- * all subentries are deleted (and no new subentries are added in the mean time)
- *
- * @param entries directory entries to check
- * @return true if all entries were successfully deleted
- */
- private boolean checkAndDeleteEntries(FileStatus[] entries) {
- if (entries == null) {
- return true;
- }
- boolean allEntriesDeleted = true;
- List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
- List<FileStatus> dirs = new ArrayList<>();
- for (FileStatus child : entries) {
- if (child.isDirectory()) {
- dirs.add(child);
- } else {
- // collect all files to attempt to delete in one batch
- files.add(child);
- }
- }
- if (dirs.size() > 0) {
- sortByConsumedSpace(dirs);
- LOG.debug("Prepared to delete files in directories: " + dirs);
- for (FileStatus child : dirs) {
- Path path = child.getPath();
- // for each subdirectory delete it and all entries if possible
- if (!checkAndDeleteDirectory(path)) {
- allEntriesDeleted = false;
- }
- }
- }
- if (!checkAndDeleteFiles(files)) {
- allEntriesDeleted = false;
- }
- return allEntriesDeleted;
- }
-
- /**
- * Attempt to delete a directory and all files under that directory. Each child file is passed
- * through the delegates to see if it can be deleted. If the directory has no children when the
- * cleaners have finished it is deleted.
- * <p>
- * If new children files are added between checks of the directory, the directory will <b>not</b>
- * be deleted.
- * @param dir directory to check
- * @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
- */
- @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Checking directory: " + dir);
- }
-
- try {
- FileStatus[] children = FSUtils.listStatus(fs, dir);
- boolean allChildrenDeleted = checkAndDeleteEntries(children);
-
- // if the directory still has children, we can't delete it, so we are done
- if (!allChildrenDeleted) return false;
- } catch (IOException e) {
- e = e instanceof RemoteException ?
- ((RemoteException)e).unwrapRemoteException() : e;
- LOG.warn("Error while listing directory: " + dir, e);
- // couldn't list directory, so don't try to delete, and don't return success
- return false;
- }
-
- // otherwise, all the children (that we know about) have been deleted, so we should try to
- // delete this directory. However, don't do so recursively so we don't delete files that have
- // been added since we last checked.
- try {
- return fs.delete(dir, false);
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Couldn't delete directory: " + dir, e);
- }
- // couldn't delete w/o exception, so we can't return success.
- return false;
- }
- }
-
- /**
* Run the given files through each of the cleaners to see if it should be deleted, deleting it if
* necessary.
* @param files List of FileStatus for the files to check (and possibly delete)
* @return true iff successfully deleted all files
*/
private boolean checkAndDeleteFiles(List<FileStatus> files) {
+ if (files == null) {
+ return true;
+ }
+
// first check to see if the path is valid
List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
List<FileStatus> invalidFiles = Lists.newArrayList();
@@ -368,6 +374,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
}
+ @VisibleForTesting
+ int getChorePoolSize() {
+ return chorePoolSize;
+ }
+
/**
* @param enabled
*/
@@ -378,4 +389,132 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
public boolean getEnabled() {
return this.enabled.get();
}
+
+ private interface Action<T> {
+ T act() throws IOException;
+ }
+
+ private class CleanerTask extends RecursiveTask<Boolean> {
+ private final Path dir;
+ private final boolean root;
+
+ CleanerTask(final FileStatus dir, final boolean root) {
+ this(dir.getPath(), root);
+ }
+
+ CleanerTask(final Path dir, final boolean root) {
+ this.dir = dir;
+ this.root = root;
+ }
+
+ @Override
+ protected Boolean compute() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CleanerTask " + Thread.currentThread().getId() +
+ " starts cleaning dirs and files under " + dir + " and itself.");
+ }
+
+ List<FileStatus> subDirs;
+ List<FileStatus> files;
+ try {
+ subDirs = getFilteredStatus(status -> status.isDirectory());
+ files = getFilteredStatus(status -> status.isFile());
+ } catch (IOException ioe) {
+ LOG.warn(dir + " doesn't exist, just skip it. ", ioe);
+ return true;
+ }
+
+ boolean nullSubDirs = subDirs == null;
+ if (nullSubDirs) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There is no subdir under " + dir);
+ }
+ }
+ if (files == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There is no file under " + dir);
+ }
+ }
+
+ int capacity = nullSubDirs ? 0 : subDirs.size();
+ List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity);
+ if (!nullSubDirs) {
+ sortByConsumedSpace(subDirs);
+ for (FileStatus subdir : subDirs) {
+ CleanerTask task = new CleanerTask(subdir, false);
+ tasks.add(task);
+ task.fork();
+ }
+ }
+
+ boolean result = true;
+ result &= deleteAction(() -> checkAndDeleteFiles(files), "files");
+ result &= deleteAction(() -> getCleanRusult(tasks), "subdirs");
+ // if and only if files and subdirs under current dir are deleted successfully, and
+ // it is not the root dir, then task will try to delete it.
+ if (result && !root) {
+ result &= deleteAction(() -> fs.delete(dir, false), "dir");
+ }
+ return result;
+ }
+
+ /**
+ * Get FileStatus with filter.
+ * Pay attention that FSUtils #listStatusWithStatusFilter would return null,
+ * even though status is empty but not null.
+ * @param function a filter function
+ * @return filtered FileStatus
+ * @throws IOException if there's no such a directory
+ */
+ private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
+ return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status));
+ }
+
+ /**
+ * Perform a delete on a specified type.
+ * @param deletion a delete
+ * @param type possible values are 'files', 'subdirs', 'dirs'
+ * @return true if it deleted successfully, false otherwise
+ */
+ private boolean deleteAction(Action<Boolean> deletion, String type) {
+ boolean deleted;
+ String errorMsg = "";
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start deleting " + type + " under " + dir);
+ }
+ deleted = deletion.act();
+ } catch (IOException ioe) {
+ errorMsg = ioe.getMessage();
+ deleted = false;
+ }
+ if (LOG.isDebugEnabled()) {
+ if (deleted) {
+ LOG.debug("Finish deleting " + type + " under " + dir);
+ } else {
+ LOG.debug("Couldn't delete " + type + " completely under " + dir +
+ " with reasons: " + (!errorMsg.equals("") ? errorMsg : " undeletable, please check."));
+ }
+ }
+ return deleted;
+ }
+
+ /**
+ * Get cleaner results of subdirs.
+ * @param tasks subdirs cleaner tasks
+ * @return true if all subdirs deleted successfully, false for patial/all failures
+ * @throws IOException something happen during computation
+ */
+ private boolean getCleanRusult(List<CleanerTask> tasks) throws IOException {
+ boolean cleaned = true;
+ try {
+ for (CleanerTask task : tasks) {
+ cleaned &= task.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ return cleaned;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2442cbb6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index e91d4f1..5c78dc4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -32,11 +32,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
@@ -44,8 +43,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
* folder that are deletable for each HFile cleaner in the chain.
*/
@InterfaceAudience.Private
-public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> implements
- ConfigurationObserver {
+public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
@@ -390,6 +388,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
@Override
public void onConfigurationChange(Configuration conf) {
+ super.onConfigurationChange(conf);
+
if (!checkAndUpdateConfigurations(conf)) {
LOG.debug("Update configuration triggered but nothing changed for this cleaner");
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/2442cbb6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 8569cb5..3cb620e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -19,15 +19,24 @@ package org.apache.hadoop.hbase.master.cleaner;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
@@ -38,6 +47,12 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
+ public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size";
+ public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2;
+
+ private final LinkedBlockingQueue<CleanerContext> pendingDelete;
+ private List<Thread> oldWALsCleaner;
+
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
@@ -48,6 +63,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
+ this.pendingDelete = new LinkedBlockingQueue<>();
+ int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
+ this.oldWALsCleaner = createOldWalsCleaner(size);
}
@Override
@@ -55,4 +73,152 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
return AbstractFSWALProvider.validateWALFilename(file.getName())
|| MasterProcedureUtil.validateProcedureWALFilename(file.getName());
}
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ super.onConfigurationChange(conf);
+
+ int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
+ if (newSize == oldWALsCleaner.size()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Size from configuration is the same as previous which is " +
+ newSize + ", no need to update.");
+ }
+ return;
+ }
+ interruptOldWALsCleaner();
+ oldWALsCleaner = createOldWalsCleaner(newSize);
+ }
+
+ @Override
+ protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
+ List<CleanerContext> results = new LinkedList<>();
+ for (FileStatus toDelete : filesToDelete) {
+ CleanerContext context = CleanerContext.createCleanerContext(toDelete);
+ if (context != null) {
+ pendingDelete.add(context);
+ results.add(context);
+ }
+ }
+
+ int deletedFiles = 0;
+ for (CleanerContext res : results) {
+ deletedFiles += res.getResult(500) ? 1 : 0;
+ }
+ return deletedFiles;
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ interruptOldWALsCleaner();
+ }
+
+ @VisibleForTesting
+ int getSizeOfCleaners() {
+ return oldWALsCleaner.size();
+ }
+
+ private List<Thread> createOldWalsCleaner(int size) {
+ LOG.info("Creating OldWALs cleaners with size: " + size);
+
+ List<Thread> oldWALsCleaner = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Thread cleaner = new Thread(() -> deleteFile());
+ cleaner.setName("OldWALsCleaner-" + i);
+ cleaner.setDaemon(true);
+ cleaner.start();
+ oldWALsCleaner.add(cleaner);
+ }
+ return oldWALsCleaner;
+ }
+
+ private void interruptOldWALsCleaner() {
+ for (Thread cleaner : oldWALsCleaner) {
+ cleaner.interrupt();
+ }
+ oldWALsCleaner.clear();
+ }
+
+ private void deleteFile() {
+ while (true) {
+ CleanerContext context = null;
+ boolean succeed = false;
+ boolean interrupted = false;
+ try {
+ context = pendingDelete.take();
+ if (context != null) {
+ FileStatus toClean = context.getTargetToClean();
+ succeed = this.fs.delete(toClean.getPath(), false);
+ }
+ } catch (InterruptedException ite) {
+ // It's most likely from configuration changing request
+ if (context != null) {
+ LOG.warn("Interrupted while cleaning oldWALs " +
+ context.getTargetToClean() + ", try to clean it next round.");
+ }
+ interrupted = true;
+ } catch (IOException e) {
+ // fs.delete() fails.
+ LOG.warn("Failed to clean oldwals with exception: " + e);
+ succeed = false;
+ } finally {
+ context.setResult(succeed);
+ if (interrupted) {
+ // Restore interrupt status
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exiting cleaner.");
+ }
+ }
+
+ private static final class CleanerContext {
+ // At most waits 60 seconds
+ static final long MAX_WAIT = 60 * 1000;
+
+ final FileStatus target;
+ volatile boolean result;
+ volatile boolean setFromCleaner = false;
+
+ static CleanerContext createCleanerContext(FileStatus status) {
+ return status != null ? new CleanerContext(status) : null;
+ }
+
+ private CleanerContext(FileStatus status) {
+ this.target = status;
+ this.result = false;
+ }
+
+ synchronized void setResult(boolean res) {
+ this.result = res;
+ this.setFromCleaner = true;
+ notify();
+ }
+
+ synchronized boolean getResult(long waitIfNotFinished) {
+ long totalTime = 0;
+ try {
+ while (!setFromCleaner) {
+ wait(waitIfNotFinished);
+ totalTime += waitIfNotFinished;
+ if (totalTime >= MAX_WAIT) {
+ LOG.warn("Spend too much time to delete oldwals " + target);
+ return result;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting deletion of " + target);
+ return result;
+ }
+ return result;
+ }
+
+ FileStatus getTargetToClean() {
+ return target;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2442cbb6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
index 566479a..39bdbc7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,8 +54,7 @@ public class TestCleanerChore {
public void cleanup() throws Exception {
// delete and recreate the test directory, ensuring a clean test dir between tests
UTIL.cleanupTestDir();
-}
-
+ }
@Test
public void testSavesFilesOnRequest() throws Exception {
@@ -276,11 +278,8 @@ public class TestCleanerChore {
}
}).when(spy).isFileDeletable(Mockito.any());
- // attempt to delete the directory, which
- if (chore.checkAndDeleteDirectory(parent)) {
- throw new Exception(
- "Reported success deleting directory, should have failed when adding file mid-iteration");
- }
+ // run the chore
+ chore.chore();
// make sure all the directories + added file exist, but the original file is deleted
assertTrue("Added file unexpectedly deleted", fs.exists(racyFile));
@@ -355,6 +354,54 @@ public class TestCleanerChore {
assertTrue("Directory got deleted", fs.exists(parent));
}
+ @Test
+ public void testOnConfigurationChange() throws Exception {
+ Stoppable stop = new StoppableImplementation();
+ Configuration conf = UTIL.getConfiguration();
+ Path testDir = UTIL.getDataTestDir();
+ FileSystem fs = UTIL.getTestFileSystem();
+ String confKey = "hbase.test.cleaner.delegates";
+ conf.set(confKey, AlwaysDelete.class.getName());
+ conf.set(CleanerChore.CHORE_POOL_SIZE, "2");
+ AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey);
+ chore.setEnabled(true);
+ // Create subdirs under testDir
+ int dirNums = 6;
+ Path[] subdirs = new Path[dirNums];
+ for (int i = 0; i < dirNums; i++) {
+ subdirs[i] = new Path(testDir, "subdir-" + i);
+ fs.mkdirs(subdirs[i]);
+ }
+ // Under each subdirs create 6 files
+ for (Path subdir : subdirs) {
+ createFiles(fs, subdir, 6);
+ }
+ // Start chore
+ Thread t = new Thread(() -> chore.chore());
+ t.setDaemon(true);
+ t.start();
+ // Change size of chore's pool
+ conf.set(CleanerChore.CHORE_POOL_SIZE, "4");
+ chore.onConfigurationChange(conf);
+ assertEquals(4, chore.getChorePoolSize());
+ // Stop chore
+ t.join();
+ }
+
+ private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
+ Random random = new Random();
+ for (int i = 0; i < numOfFiles; i++) {
+ int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
+ try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
+ for (int m = 0; m < xMega; m++) {
+ byte[] M = new byte[1024 * 1024];
+ random.nextBytes(M);
+ fsdos.write(M);
+ }
+ }
+ }
+ }
+
private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> {
public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs,
http://git-wip-us.apache.org/repos/asf/hbase/blob/2442cbb6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 98176fe..34e81db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -29,9 +29,11 @@ import java.net.URLEncoder;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -76,11 +78,13 @@ public class TestLogsCleaner {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
+ TEST_UTIL.startMiniDFSCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -248,6 +252,53 @@ public class TestLogsCleaner {
}
}
+ @Test
+ public void testOnConfigurationChange() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE);
+ // Prepare environments
+ Server server = new DummyServer();
+ Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
+ assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners());
+ // Create dir and files for test
+ fs.delete(oldWALsDir, true);
+ fs.mkdirs(oldWALsDir);
+ int numOfFiles = 10;
+ createFiles(fs, oldWALsDir, numOfFiles);
+ FileStatus[] status = fs.listStatus(oldWALsDir);
+ assertEquals(numOfFiles, status.length);
+ // Start cleaner chore
+ Thread thread = new Thread(() -> cleaner.chore());
+ thread.setDaemon(true);
+ thread.start();
+ // change size of cleaners dynamically
+ int sizeToChange = 4;
+ conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange);
+ cleaner.onConfigurationChange(conf);
+ assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
+ // Stop chore
+ thread.join();
+ status = fs.listStatus(oldWALsDir);
+ assertEquals(0, status.length);
+ }
+
+ private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
+ Random random = new Random();
+ for (int i = 0; i < numOfFiles; i++) {
+ int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
+ try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
+ for (int m = 0; m < xMega; m++) {
+ byte[] M = new byte[1024 * 1024];
+ random.nextBytes(M);
+ fsdos.write(M);
+ }
+ }
+ }
+ }
+
static class DummyServer implements Server {
@Override