You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/08/22 11:42:07 UTC
[hbase] branch master updated: HBASE-22867 The ForkJoinPool in
CleanerChore will spawn thousands of threads in our cluster with thousands
table (#513)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 4268774 HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)
4268774 is described below
commit 4268774de16d7a25da9446a05055bdf3d648766e
Author: openinx <op...@gmail.com>
AuthorDate: Thu Aug 22 19:42:01 2019 +0800
HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>
---
.../hadoop/hbase/master/cleaner/CleanerChore.java | 216 +++++++++------------
.../hadoop/hbase/master/cleaner/DirScanPool.java | 27 ++-
2 files changed, 114 insertions(+), 129 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index 6ccc7ef..2098b2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -18,16 +18,17 @@
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -211,11 +211,16 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
cleanersChain.forEach(FileCleanerDelegate::preClean);
}
- public Boolean runCleaner() {
+ public boolean runCleaner() {
preRunCleaner();
- CleanerTask task = new CleanerTask(this.oldFileDir, true);
- pool.execute(task);
- return task.join();
+ try {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
+ return future.get();
+ } catch (Exception e) {
+ LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
+ return false;
+ }
}
/**
@@ -380,126 +385,97 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
private interface Action<T> {
- T act() throws IOException;
+ T act() throws Exception;
}
/**
- * Attemps to clean up a directory, its subdirectories, and files. Return value is true if
- * everything was deleted. false on partial / total failures.
+ * Attempts to clean up a directory(its subdirectories, and files) in a
+ * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
+ * calling result.get().
*/
- private final class CleanerTask extends RecursiveTask<Boolean> {
-
- private static final long serialVersionUID = -5444212174088754172L;
-
- private final Path dir;
- private final boolean root;
-
- CleanerTask(final FileStatus dir, final boolean root) {
- this(dir.getPath(), root);
- }
-
- CleanerTask(final Path dir, final boolean root) {
- this.dir = dir;
- this.root = root;
- }
-
- @Override
- protected Boolean compute() {
- LOG.trace("Cleaning under {}", dir);
- List<FileStatus> subDirs;
- List<FileStatus> files;
- try {
- // if dir doesn't exist, we'll get null back for both of these
- // which will fall through to succeeding.
- subDirs = getFilteredStatus(FileStatus::isDirectory);
- files = getFilteredStatus(FileStatus::isFile);
- } catch (IOException ioe) {
- LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
- return false;
- }
-
- boolean allFilesDeleted = true;
- if (!files.isEmpty()) {
- allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
- }
-
- boolean allSubdirsDeleted = true;
+ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
+ try {
+ // Step.1: List all files under the given directory.
+ List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
+ List<FileStatus> subDirs =
+ allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
+ List<FileStatus> files =
+ allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
+
+ // Step.2: Try to delete all the deletable files.
+ boolean allFilesDeleted =
+ files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
+
+ // Step.3: Start to traverse and delete the sub-directories.
+ List<CompletableFuture<Boolean>> futures = new ArrayList<>();
if (!subDirs.isEmpty()) {
- List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
sortByConsumedSpace(subDirs);
- for (FileStatus subdir : subDirs) {
- CleanerTask task = new CleanerTask(subdir, false);
- tasks.add(task);
- task.fork();
- }
- allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
+ // Submit the request of sub-directory deletion.
+ subDirs.forEach(subDir -> {
+ CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
+ pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
+ futures.add(subFuture);
+ });
}
- boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);
- // if and only if files and subdirs under current dir are deleted successfully, and the empty
- // directory can be deleted, and it is not the root dir then task will try to delete it.
- if (result && !root) {
- result &= deleteAction(() -> fs.delete(dir, false), "dir");
- }
- return result;
- }
-
- /**
- * Get FileStatus with filter.
- * @param function a filter function
- * @return filtered FileStatus or empty list if dir doesn't exist
- * @throws IOException if there's an error other than dir not existing
- */
- private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
- return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
- status -> function.test(status))).orElseGet(Collections::emptyList);
- }
-
- /**
- * Perform a delete on a specified type.
- * @param deletion a delete
- * @param type possible values are 'files', 'subdirs', 'dirs'
- * @return true if it deleted successfully, false otherwise
- */
- private boolean deleteAction(Action<Boolean> deletion, String type) {
- boolean deleted;
- try {
- LOG.trace("Start deleting {} under {}", type, dir);
- deleted = deletion.act();
- } catch (PathIsNotEmptyDirectoryException exception) {
- // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
- // LocalFileSystem throws a bare IOException. So some test code will get the verbose
- // message below.
- LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
- "exception details at TRACE.", dir);
- LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
- deleted = false;
- } catch (IOException ioe) {
- LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
- "happening, use following exception when asking on mailing list.",
- type, dir, ioe);
- deleted = false;
- }
- LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
- return deleted;
+ // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
+ // current directory asynchronously.
+ FutureUtils.addListener(
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
+ (voidObj, e) -> {
+ if (e != null) {
+ result.completeExceptionally(e);
+ return;
+ }
+ try {
+ boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
+ boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
+ if (deleted && !root) {
+ // If and only if files and sub-dirs under current dir are deleted successfully, and
+ // the empty directory can be deleted, and it is not the root dir then task will
+ // try to delete it.
+ deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
+ }
+ result.complete(deleted);
+ } catch (Exception ie) {
+ // Must handle the inner exception here, otherwise the result may get stuck if one
+ // sub-directory get some failure.
+ result.completeExceptionally(ie);
+ }
+ });
+ } catch (Exception e) {
+ LOG.debug("Failed to traverse and delete the path: {}", dir, e);
+ result.completeExceptionally(e);
}
+ }
- /**
- * Get cleaner results of subdirs.
- * @param tasks subdirs cleaner tasks
- * @return true if all subdirs deleted successfully, false for patial/all failures
- * @throws IOException something happen during computation
- */
- private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
- boolean cleaned = true;
- try {
- for (CleanerTask task : tasks) {
- cleaned &= task.get();
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
- return cleaned;
+ /**
+ * Perform a delete on a specified type.
+ * @param deletion a delete
+ * @param type possible values are 'files', 'subdirs', 'dirs'
+ * @return true if it deleted successfully, false otherwise
+ */
+ private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
+ boolean deleted;
+ try {
+ LOG.trace("Start deleting {} under {}", type, dir);
+ deleted = deletion.act();
+ } catch (PathIsNotEmptyDirectoryException exception) {
+ // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
+ // LocalFileSystem throws a bare IOException. So some test code will get the verbose
+ // message below.
+ LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
+ deleted = false;
+ } catch (IOException ioe) {
+ LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
+ + "happening, use following exception when asking on mailing list.",
+ type, dir, ioe);
+ deleted = false;
+ } catch (Exception e) {
+ LOG.info("unexpected exception: ", e);
+ deleted = false;
}
+ LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
+ return deleted;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
index a3a7d8e..ca93474 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory;
public class DirScanPool implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
private volatile int size;
- private ForkJoinPool pool;
+ private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;
@@ -42,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver {
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
- pool = new ForkJoinPool(size);
+ pool = initializePool(size);
LOG.info("Cleaner pool size is {}", size);
cleanerLatch = 0;
}
+ private static ThreadPoolExecutor initializePool(int size) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool"));
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
@@ -73,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver {
notifyAll();
}
- synchronized void execute(ForkJoinTask<?> task) {
- pool.execute(task);
+ synchronized void execute(Runnable runnable) {
+ pool.execute(runnable);
}
public synchronized void shutdownNow() {
@@ -99,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver {
break;
}
}
- shutdownNow();
- LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
- pool = new ForkJoinPool(size);
+ LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
+ pool.setCorePoolSize(size);
}
public int getSize() {