You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/08/22 12:27:56 UTC

[hbase] branch branch-2 updated: HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 4275c5f  HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)
4275c5f is described below

commit 4275c5f3740f0a69fab1ff474b872b3166a2edf9
Author: openinx <op...@gmail.com>
AuthorDate: Thu Aug 22 19:42:01 2019 +0800

    HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Reid Chan <re...@apache.org>
---
 .../hadoop/hbase/master/cleaner/CleanerChore.java  | 216 +++++++++------------
 .../hadoop/hbase/master/cleaner/DirScanPool.java   |  27 ++-
 2 files changed, 114 insertions(+), 129 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index 6ccc7ef..b4f1f9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -18,16 +18,17 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -211,11 +211,16 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
     cleanersChain.forEach(FileCleanerDelegate::preClean);
   }
 
-  public Boolean runCleaner() {
+  public boolean runCleaner() {
     preRunCleaner();
-    CleanerTask task = new CleanerTask(this.oldFileDir, true);
-    pool.execute(task);
-    return task.join();
+    try {
+      CompletableFuture<Boolean> future = new CompletableFuture<>();
+      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
+      return future.get();
+    } catch (Exception e) {
+      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
+      return false;
+    }
   }
 
   /**
@@ -380,126 +385,97 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
   }
 
   private interface Action<T> {
-    T act() throws IOException;
+    T act() throws Exception;
   }
 
   /**
-   * Attemps to clean up a directory, its subdirectories, and files. Return value is true if
-   * everything was deleted. false on partial / total failures.
+   * Attempts to clean up a directory(its subdirectories, and files) in a
+   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
+   * calling result.get().
    */
-  private final class CleanerTask extends RecursiveTask<Boolean> {
-
-    private static final long serialVersionUID = -5444212174088754172L;
-
-    private final Path dir;
-    private final boolean root;
-
-    CleanerTask(final FileStatus dir, final boolean root) {
-      this(dir.getPath(), root);
-    }
-
-    CleanerTask(final Path dir, final boolean root) {
-      this.dir = dir;
-      this.root = root;
-    }
-
-    @Override
-    protected Boolean compute() {
-      LOG.trace("Cleaning under {}", dir);
-      List<FileStatus> subDirs;
-      List<FileStatus> files;
-      try {
-        // if dir doesn't exist, we'll get null back for both of these
-        // which will fall through to succeeding.
-        subDirs = getFilteredStatus(FileStatus::isDirectory);
-        files = getFilteredStatus(FileStatus::isFile);
-      } catch (IOException ioe) {
-        LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
-        return false;
-      }
-
-      boolean allFilesDeleted = true;
-      if (!files.isEmpty()) {
-        allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
-      }
-
-      boolean allSubdirsDeleted = true;
+  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
+    try {
+      // Step.1: List all files under the given directory.
+      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
+      List<FileStatus> subDirs =
+          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
+      List<FileStatus> files =
+          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
+
+      // Step.2: Try to delete all the deletable files.
+      boolean allFilesDeleted =
+          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
+
+      // Step.3: Start to traverse and delete the sub-directories.
+      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
       if (!subDirs.isEmpty()) {
-        List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
         sortByConsumedSpace(subDirs);
-        for (FileStatus subdir : subDirs) {
-          CleanerTask task = new CleanerTask(subdir, false);
-          tasks.add(task);
-          task.fork();
-        }
-        allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
+        // Submit the request of sub-directory deletion.
+        subDirs.forEach(subDir -> {
+          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
+          pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
+          futures.add(subFuture);
+        });
       }
 
-      boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);
-      // if and only if files and subdirs under current dir are deleted successfully, and the empty
-      // directory can be deleted, and it is not the root dir then task will try to delete it.
-      if (result && !root) {
-        result &= deleteAction(() -> fs.delete(dir, false), "dir");
-      }
-      return result;
-    }
-
-    /**
-     * Get FileStatus with filter.
-     * @param function a filter function
-     * @return filtered FileStatus or empty list if dir doesn't exist
-     * @throws IOException if there's an error other than dir not existing
-     */
-    private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
-      return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
-        status -> function.test(status))).orElseGet(Collections::emptyList);
-    }
-
-    /**
-     * Perform a delete on a specified type.
-     * @param deletion a delete
-     * @param type possible values are 'files', 'subdirs', 'dirs'
-     * @return true if it deleted successfully, false otherwise
-     */
-    private boolean deleteAction(Action<Boolean> deletion, String type) {
-      boolean deleted;
-      try {
-        LOG.trace("Start deleting {} under {}", type, dir);
-        deleted = deletion.act();
-      } catch (PathIsNotEmptyDirectoryException exception) {
-        // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
-        // LocalFileSystem throws a bare IOException. So some test code will get the verbose
-        // message below.
-        LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
-            "exception details at TRACE.", dir);
-        LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
-        deleted = false;
-      } catch (IOException ioe) {
-        LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
-                  "happening, use following exception when asking on mailing list.",
-                  type, dir, ioe);
-        deleted = false;
-      }
-      LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
-      return deleted;
+      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
+      // current directory asynchronously.
+      FutureUtils.addListener(
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
+        (voidObj, e) -> {
+          if (e != null) {
+            result.completeExceptionally(e);
+            return;
+          }
+          try {
+            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
+            boolean deleted = allFilesDeleted && allSubDirsDeleted;
+            if (deleted && !root) {
+              // If and only if files and sub-dirs under current dir are deleted successfully, and
+              // the empty directory can be deleted, and it is not the root dir then task will
+              // try to delete it.
+              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
+            }
+            result.complete(deleted);
+          } catch (Exception ie) {
+            // Must handle the inner exception here, otherwise the result may get stuck if one
+            // sub-directory get some failure.
+            result.completeExceptionally(ie);
+          }
+        });
+    } catch (Exception e) {
+      LOG.debug("Failed to traverse and delete the path: {}", dir, e);
+      result.completeExceptionally(e);
     }
+  }
 
-    /**
-     * Get cleaner results of subdirs.
-     * @param tasks subdirs cleaner tasks
-     * @return true if all subdirs deleted successfully, false for patial/all failures
-     * @throws IOException something happen during computation
-     */
-    private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
-      boolean cleaned = true;
-      try {
-        for (CleanerTask task : tasks) {
-          cleaned &= task.get();
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        throw new IOException(e);
-      }
-      return cleaned;
+  /**
+   * Perform a delete on a specified type.
+   * @param deletion a delete
+   * @param type possible values are 'files', 'subdirs', 'dirs'
+   * @return true if it deleted successfully, false otherwise
+   */
+  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
+    boolean deleted;
+    try {
+      LOG.trace("Start deleting {} under {}", type, dir);
+      deleted = deletion.act();
+    } catch (PathIsNotEmptyDirectoryException exception) {
+      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
+      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
+      // message below.
+      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
+      deleted = false;
+    } catch (IOException ioe) {
+      LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
+          + "happening, use following exception when asking on mailing list.",
+        type, dir, ioe);
+      deleted = false;
+    } catch (Exception e) {
+      LOG.info("unexpected exception: ", e);
+      deleted = false;
     }
+    LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
+    return deleted;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
index a3a7d8e..ca93474 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory;
 public class DirScanPool implements ConfigurationObserver {
   private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
   private volatile int size;
-  private ForkJoinPool pool;
+  private final ThreadPoolExecutor pool;
   private int cleanerLatch;
   private boolean reconfigNotification;
 
@@ -42,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver {
     // poolSize may be 0 or 0.0 from a careless configuration,
     // double check to make sure.
     size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
-    pool = new ForkJoinPool(size);
+    pool = initializePool(size);
     LOG.info("Cleaner pool size is {}", size);
     cleanerLatch = 0;
   }
 
+  private static ThreadPoolExecutor initializePool(int size) {
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
+        new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool"));
+    executor.allowCoreThreadTimeOut(true);
+    return executor;
+  }
+
   /**
    * Checks if pool can be updated. If so, mark for update later.
    * @param conf configuration
@@ -73,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver {
     notifyAll();
   }
 
-  synchronized void execute(ForkJoinTask<?> task) {
-    pool.execute(task);
+  synchronized void execute(Runnable runnable) {
+    pool.execute(runnable);
   }
 
   public synchronized void shutdownNow() {
@@ -99,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver {
         break;
       }
     }
-    shutdownNow();
-    LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
-    pool = new ForkJoinPool(size);
+    LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
+    pool.setCorePoolSize(size);
   }
 
   public int getSize() {