You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/09/05 11:36:27 UTC
[hbase] branch branch-1.4 updated: HBASE-22912 [Backport]
HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads
(#549)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new 1d41159 HBASE-22912 [Backport] HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads (#549)
1d41159 is described below
commit 1d411597b2231508b574345a87556d8f39f85c41
Author: openinx <op...@gmail.com>
AuthorDate: Thu Sep 5 19:32:42 2019 +0800
HBASE-22912 [Backport] HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads (#549)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>
---
.../hadoop/hbase/master/cleaner/CleanerChore.java | 283 ++++++++++++---------
.../hadoop/hbase/master/cleaner/DirScanPool.java | 26 +-
2 files changed, 178 insertions(+), 131 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index db0e897..78be50b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -18,13 +18,13 @@
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.FileStatusFilter;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
@@ -207,10 +205,20 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
}
- public Boolean runCleaner() {
- CleanerTask task = new CleanerTask(this.oldFileDir, true);
- pool.execute(task);
- return task.join();
+ public boolean runCleaner() {
+ try {
+ final AsyncResult<Boolean> result = new AsyncResult<Boolean>();
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ traverseAndDelete(oldFileDir, true, result);
+ }
+ });
+ return result.get();
+ } catch (Exception e) {
+ LOG.info("Failed to traverse and delete paths under the dir: " + oldFileDir, e);
+ return false;
+ }
}
/**
@@ -322,140 +330,171 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
T act() throws IOException;
}
- /**
- * Attemps to clean up a directory, its subdirectories, and files. Return value is true if
- * everything was deleted. false on partial / total failures.
- */
- private final class CleanerTask extends RecursiveTask<Boolean> {
- private static final long serialVersionUID = -1584635903138015418L;
- private final Path dir;
- private final boolean root;
+ private interface Callback<T> {
+ void run(T val);
+ }
- CleanerTask(final FileStatus dir, final boolean root) {
- this(dir.getPath(), root);
+ private final class AsyncResult<T> {
+
+ private Callback<T> callback;
+ private T result;
+ private boolean resultSet = false;
+
+ AsyncResult(Callback<T> callback) {
+ this.callback = callback;
}
- CleanerTask(final Path dir, final boolean root) {
- this.dir = dir;
- this.root = root;
+ AsyncResult() {
}
- @Override
- protected Boolean compute() {
- LOG.trace("Cleaning under " + dir);
- List<FileStatus> subDirs;
- List<FileStatus> tmpFiles;
- final List<FileStatus> files;
- try {
- // if dir doesn't exist, we'll get null back for both of these
- // which will fall through to succeeding.
- subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
- @Override
- public boolean accept(FileStatus f) {
- return f.isDirectory();
- }
- });
- if (subDirs == null) {
- subDirs = Collections.emptyList();
+ void set(T result) {
+ synchronized (this) {
+ this.result = result;
+ if (callback != null) {
+ callback.run(result);
}
- tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
- @Override
- public boolean accept(FileStatus f) {
- return f.isFile();
- }
- });
- files = tmpFiles == null ? Collections.<FileStatus>emptyList() : tmpFiles;
- } catch (IOException ioe) {
- LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe);
- return false;
+ // Mark the result set process finished and notify the waiting get method.
+ this.resultSet = true;
+ this.notifyAll();
}
+ }
- boolean allFilesDeleted = true;
- if (!files.isEmpty()) {
- allFilesDeleted = deleteAction(new Action<Boolean>() {
- @Override
- public Boolean act() throws IOException {
- return checkAndDeleteFiles(files);
- }
- }, "files");
+ synchronized T get() throws Exception {
+ while (!resultSet) {
+ wait();
}
+ return result;
+ }
+ }
- boolean allSubdirsDeleted = true;
- if (!subDirs.isEmpty()) {
- final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
- for (FileStatus subdir : subDirs) {
- CleanerTask task = new CleanerTask(subdir, false);
- tasks.add(task);
- task.fork();
+ /**
+ * Attempts to clean up a directory(its subdirectories, and files) in a
+ * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
+ * calling result.get().
+ * @param dir means the directory we will start to traverse and delete.
+ * @param root means whether it's the root directory to traverse, if true then cannot delete it.
+ * @param result {@link AsyncResult<Boolean>} to fetch the result. True means the current
+ * directory has been deleted successfully (for root dir we don't need that) and the
+ * parent will try to delete its own directory if all of the children(files and
+ * sub-directories are included) has been deleted successfully.
+ */
+ private void traverseAndDelete(final Path dir, final boolean root,
+ final AsyncResult<Boolean> result) {
+ try {
+ final Action<Boolean> curDirDeletion = new Action<Boolean>() {
+ @Override
+ public Boolean act() throws IOException {
+ return fs.delete(dir, false);
}
- allSubdirsDeleted = deleteAction(new Action<Boolean>() {
- @Override
- public Boolean act() throws IOException {
- return getCleanResult(tasks);
- }
- }, "subdirs");
+ };
+
+ // Step.1: List all files under the given directory.
+ List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
+ final List<FileStatus> subDirs = new ArrayList<>();
+ final List<FileStatus> files = new ArrayList<>();
+ for (FileStatus status : allPaths) {
+ if (status.isDirectory()) {
+ subDirs.add(status);
+ } else if (status.isFile()) {
+ files.add(status);
+ }
+ }
+
+ // Step.2: Try to delete all the deletable files.
+ final boolean allFilesDeleted = files.isEmpty() || deleteAction(new Action<Boolean>() {
+ @Override
+ public Boolean act() throws IOException {
+ return checkAndDeleteFiles(files);
+ }
+ }, "files", dir);
+
+ // Step.3: Start to traverse and delete the sub-directories.
+ if (subDirs.isEmpty()) {
+ // If no sub-directories, then just try to delete the current dir and finish the result.
+ boolean deleted = allFilesDeleted;
+ if (allFilesDeleted && !root) {
+ deleted = deleteAction(curDirDeletion, "dir", dir);
+ }
+ result.set(deleted);
+ return;
}
- boolean result = allFilesDeleted && allSubdirsDeleted;
- // if and only if files and subdirs under current dir are deleted successfully, and
- // it is not the root dir, then task will try to delete it.
- if (result && !root) {
- result &= deleteAction(new Action<Boolean>() {
+ // Otherwise, there should be some sub-directories. then we will register the following
+ // callback in AsyncResult of sub-directory, and once all of the sub-directories are traversed
+ // and deleted then the callback will try to delete the current dir and finish the result.
+ final AtomicInteger remain = new AtomicInteger(subDirs.size());
+ Callback<Boolean> callback = new Callback<Boolean>() {
+ private volatile boolean allSubDirDeleted = true;
+
+ @Override
+ public void run(Boolean subDirDeleted) {
+ allSubDirDeleted &= subDirDeleted;
+ if (remain.decrementAndGet() == 0) {
+ boolean deleted = allFilesDeleted && allSubDirDeleted;
+ if (deleted && !root) {
+ deleted = deleteAction(curDirDeletion, "dir", dir);
+ }
+ result.set(deleted);
+ }
+ }
+ };
+
+ // Submit the request of sub-directory deletion.
+ for (FileStatus subDir : subDirs) {
+ final FileStatus finalSubDir = subDir;
+ // Register the callback in AsyncResult here.
+ final AsyncResult<Boolean> asyncResult = new AsyncResult<Boolean>(callback);
+ pool.execute(new Runnable() {
@Override
- public Boolean act() throws IOException {
- return fs.delete(dir, false);
+ public void run() {
+ traverseAndDelete(finalSubDir.getPath(), false, asyncResult);
}
- }, "dir");
+ });
+ }
+ } catch (Exception e) {
+ result.set(false);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to traverse and delete the path=" + dir + ", root=" + root, e);
}
- return result;
}
+ }
- /**
- * Perform a delete on a specified type.
- * @param deletion a delete
- * @param type possible values are 'files', 'subdirs', 'dirs'
- * @return true if it deleted successfully, false otherwise
- */
- private boolean deleteAction(Action<Boolean> deletion, String type) {
- boolean deleted;
- try {
+ /**
+ * Perform a delete on a specified type.
+ * @param deletion a delete
+ * @param type possible values are 'files', 'subdirs', 'dirs'
+ * @param dir delete actions happened under the given directory.
+ * @return true if it deleted successfully, false otherwise
+ */
+ private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
+ boolean deleted;
+ try {
+ if (LOG.isTraceEnabled()) {
LOG.trace("Start deleting " + type + " under " + dir);
- deleted = deletion.act();
- } catch (PathIsNotEmptyDirectoryException exception) {
- // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
- // LocalFileSystem throws a bare IOException. So some test code will get the verbose
- // message below.
- LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty. Probably transient. " +
- "exception details at TRACE.");
- LOG.trace("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.",
- exception);
- deleted = false;
- } catch (IOException ioe) {
- LOG.info("Could not delete " + type + " under " + dir + ". might be transient; we'll " +
- "retry. if it keeps happening, use following exception when asking on mailing list.",
- ioe);
- deleted = false;
}
- LOG.trace("Finish deleting " + type + " under " + dir + " deleted=" + deleted);
- return deleted;
- }
-
- /**
- * Get cleaner results of subdirs.
- * @param tasks subdirs cleaner tasks
- * @return true if all subdirs deleted successfully, false for patial/all failures
- * @throws IOException something happen during computation
- */
- private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
- boolean cleaned = true;
- try {
- for (CleanerTask task : tasks) {
- cleaned &= task.get();
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
+ deleted = deletion.act();
+ } catch (PathIsNotEmptyDirectoryException exception) {
+ // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
+ // LocalFileSystem throws a bare IOException. So some test code will get the verbose
+ // message below.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.",
+ exception);
}
- return cleaned;
+ deleted = false;
+ } catch (IOException ioe) {
+ LOG.info(
+ "Could not delete " + type + " under " + dir + ". might be transient; we'll retry. if it "
+ + "keeps " + "happening, use following exception when asking on mailing list.",
+ ioe);
+ deleted = false;
+ } catch (Exception e) {
+ LOG.info("unexpected exception: ", e);
+ deleted = false;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Finish deleting " + type + " under " + dir + ", deleted=" + deleted);
}
+ return deleted;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
index f201ae2..8684636 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
@@ -33,7 +35,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
public class DirScanPool implements ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(DirScanPool.class);
private volatile int size;
- private ForkJoinPool pool;
+ private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;
@@ -43,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver {
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
- pool = new ForkJoinPool(size);
+ pool = initializePool(size);
LOG.info("Cleaner pool size is " + size);
cleanerLatch = 0;
}
+ private static ThreadPoolExecutor initializePool(int size) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1L, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("dir-scan-pool"));
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
@@ -74,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver {
notifyAll();
}
- synchronized void execute(ForkJoinTask<?> task) {
- pool.execute(task);
+ synchronized void execute(Runnable runnable) {
+ this.pool.execute(runnable);
}
public synchronized void shutdownNow() {
@@ -100,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver {
break;
}
}
- shutdownNow();
- LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size);
- pool = new ForkJoinPool(size);
+ LOG.info("Update chore's pool size from " + pool.getPoolSize() + " to " + size);
+ pool.setCorePoolSize(size);
}
public int getSize() {