You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/09/05 11:36:27 UTC

[hbase] branch branch-1.4 updated: HBASE-22912 [Backport] HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads (#549)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new 1d41159  HBASE-22912 [Backport] HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads (#549)
1d41159 is described below

commit 1d411597b2231508b574345a87556d8f39f85c41
Author: openinx <op...@gmail.com>
AuthorDate: Thu Sep 5 19:32:42 2019 +0800

    HBASE-22912 [Backport] HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads (#549)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Reid Chan <re...@apache.org>
---
 .../hadoop/hbase/master/cleaner/CleanerChore.java  | 283 ++++++++++++---------
 .../hadoop/hbase/master/cleaner/DirScanPool.java   |  26 +-
 2 files changed, 178 insertions(+), 131 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index db0e897..78be50b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -18,13 +18,13 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.FileStatusFilter;
 import org.apache.hadoop.ipc.RemoteException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -207,10 +205,20 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
     }
   }
 
-  public Boolean runCleaner() {
-    CleanerTask task = new CleanerTask(this.oldFileDir, true);
-    pool.execute(task);
-    return task.join();
+  public boolean runCleaner() {
+    try {
+      final AsyncResult<Boolean> result = new AsyncResult<Boolean>();
+      pool.execute(new Runnable() {
+        @Override
+        public void run() {
+          traverseAndDelete(oldFileDir, true, result);
+        }
+      });
+      return result.get();
+    } catch (Exception e) {
+      LOG.info("Failed to traverse and delete paths under the dir: " + oldFileDir, e);
+      return false;
+    }
   }
 
   /**
@@ -322,140 +330,171 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
     T act() throws IOException;
   }
 
-  /**
-   * Attemps to clean up a directory, its subdirectories, and files. Return value is true if
-   * everything was deleted. false on partial / total failures.
-   */
-  private final class CleanerTask extends RecursiveTask<Boolean> {
-    private static final long serialVersionUID = -1584635903138015418L;
-    private final Path dir;
-    private final boolean root;
+  private interface Callback<T> {
+    void run(T val);
+  }
 
-    CleanerTask(final FileStatus dir, final boolean root) {
-      this(dir.getPath(), root);
+  private final class AsyncResult<T> {
+
+    private Callback<T> callback;
+    private T result;
+    private boolean resultSet = false;
+
+    AsyncResult(Callback<T> callback) {
+      this.callback = callback;
     }
 
-    CleanerTask(final Path dir, final boolean root) {
-      this.dir = dir;
-      this.root = root;
+    AsyncResult() {
     }
 
-    @Override
-    protected Boolean compute() {
-      LOG.trace("Cleaning under " + dir);
-      List<FileStatus> subDirs;
-      List<FileStatus> tmpFiles;
-      final List<FileStatus> files;
-      try {
-        // if dir doesn't exist, we'll get null back for both of these
-        // which will fall through to succeeding.
-        subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
-          @Override
-          public boolean accept(FileStatus f) {
-            return f.isDirectory();
-          }
-        });
-        if (subDirs == null) {
-          subDirs = Collections.emptyList();
+    void set(T result) {
+      synchronized (this) {
+        this.result = result;
+        if (callback != null) {
+          callback.run(result);
         }
-        tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
-          @Override
-          public boolean accept(FileStatus f) {
-            return f.isFile();
-          }
-        });
-        files = tmpFiles == null ? Collections.<FileStatus>emptyList() : tmpFiles;
-      } catch (IOException ioe) {
-        LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe);
-        return false;
+        // Mark the result set process finished and notify the waiting get method.
+        this.resultSet = true;
+        this.notifyAll();
       }
+    }
 
-      boolean allFilesDeleted = true;
-      if (!files.isEmpty()) {
-        allFilesDeleted = deleteAction(new Action<Boolean>() {
-          @Override
-          public Boolean act() throws IOException {
-            return checkAndDeleteFiles(files);
-          }
-        }, "files");
+    synchronized T get() throws Exception {
+      while (!resultSet) {
+        wait();
       }
+      return result;
+    }
+  }
 
-      boolean allSubdirsDeleted = true;
-      if (!subDirs.isEmpty()) {
-        final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
-        for (FileStatus subdir : subDirs) {
-          CleanerTask task = new CleanerTask(subdir, false);
-          tasks.add(task);
-          task.fork();
+  /**
+   * Attempts to clean up a directory(its subdirectories, and files) in a
+   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
+   * calling result.get().
+   * @param dir means the directory we will start to traverse and delete.
+   * @param root means whether it's the root directory to traverse, if true then cannot delete it.
+   * @param result {@link AsyncResult<Boolean>} to fetch the result. True means the current
+   *          directory has been deleted successfully (for root dir we don't need that) and the
+   *          parent will try to delete its own directory if all of the children(files and
+   *          sub-directories are included) has been deleted successfully.
+   */
+  private void traverseAndDelete(final Path dir, final boolean root,
+      final AsyncResult<Boolean> result) {
+    try {
+      final Action<Boolean> curDirDeletion = new Action<Boolean>() {
+        @Override
+        public Boolean act() throws IOException {
+          return fs.delete(dir, false);
         }
-        allSubdirsDeleted = deleteAction(new Action<Boolean>() {
-          @Override
-          public Boolean act() throws IOException {
-            return getCleanResult(tasks);
-          }
-        }, "subdirs");
+      };
+
+      // Step.1: List all files under the given directory.
+      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
+      final List<FileStatus> subDirs = new ArrayList<>();
+      final List<FileStatus> files = new ArrayList<>();
+      for (FileStatus status : allPaths) {
+        if (status.isDirectory()) {
+          subDirs.add(status);
+        } else if (status.isFile()) {
+          files.add(status);
+        }
+      }
+
+      // Step.2: Try to delete all the deletable files.
+      final boolean allFilesDeleted = files.isEmpty() || deleteAction(new Action<Boolean>() {
+        @Override
+        public Boolean act() throws IOException {
+          return checkAndDeleteFiles(files);
+        }
+      }, "files", dir);
+
+      // Step.3: Start to traverse and delete the sub-directories.
+      if (subDirs.isEmpty()) {
+        // If no sub-directories, then just try to delete the current dir and finish the result.
+        boolean deleted = allFilesDeleted;
+        if (allFilesDeleted && !root) {
+          deleted = deleteAction(curDirDeletion, "dir", dir);
+        }
+        result.set(deleted);
+        return;
       }
 
-      boolean result = allFilesDeleted && allSubdirsDeleted;
-      // if and only if files and subdirs under current dir are deleted successfully, and
-      // it is not the root dir, then task will try to delete it.
-      if (result && !root) {
-        result &= deleteAction(new Action<Boolean>() {
+      // Otherwise, there should be some sub-directories. then we will register the following
+      // callback in AsyncResult of sub-directory, and once all of the sub-directories are traversed
+      // and deleted then the callback will try to delete the current dir and finish the result.
+      final AtomicInteger remain = new AtomicInteger(subDirs.size());
+      Callback<Boolean> callback = new Callback<Boolean>() {
+        private volatile boolean allSubDirDeleted = true;
+
+        @Override
+        public void run(Boolean subDirDeleted) {
+          allSubDirDeleted &= subDirDeleted;
+          if (remain.decrementAndGet() == 0) {
+            boolean deleted = allFilesDeleted && allSubDirDeleted;
+            if (deleted && !root) {
+              deleted = deleteAction(curDirDeletion, "dir", dir);
+            }
+            result.set(deleted);
+          }
+        }
+      };
+
+      // Submit the request of sub-directory deletion.
+      for (FileStatus subDir : subDirs) {
+        final FileStatus finalSubDir = subDir;
+        // Register the callback in AsyncResult here.
+        final AsyncResult<Boolean> asyncResult = new AsyncResult<Boolean>(callback);
+        pool.execute(new Runnable() {
           @Override
-          public Boolean act() throws IOException {
-            return fs.delete(dir, false);
+          public void run() {
+            traverseAndDelete(finalSubDir.getPath(), false, asyncResult);
           }
-        }, "dir");
+        });
+      }
+    } catch (Exception e) {
+      result.set(false);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to traverse and delete the path=" + dir + ", root=" + root, e);
       }
-      return result;
     }
+  }
 
-    /**
-     * Perform a delete on a specified type.
-     * @param deletion a delete
-     * @param type possible values are 'files', 'subdirs', 'dirs'
-     * @return true if it deleted successfully, false otherwise
-     */
-    private boolean deleteAction(Action<Boolean> deletion, String type) {
-      boolean deleted;
-      try {
+  /**
+   * Perform a delete on a specified type.
+   * @param deletion a delete
+   * @param type possible values are 'files', 'subdirs', 'dirs'
+   * @param dir delete actions happened under the given directory.
+   * @return true if it deleted successfully, false otherwise
+   */
+  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
+    boolean deleted;
+    try {
+      if (LOG.isTraceEnabled()) {
         LOG.trace("Start deleting " + type + " under " + dir);
-        deleted = deletion.act();
-      } catch (PathIsNotEmptyDirectoryException exception) {
-        // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
-        // LocalFileSystem throws a bare IOException. So some test code will get the verbose
-        // message below.
-        LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty. Probably transient. " +
-            "exception details at TRACE.");
-        LOG.trace("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.",
-            exception);
-        deleted = false;
-      } catch (IOException ioe) {
-        LOG.info("Could not delete " + type + " under " + dir + ". might be transient; we'll " +
-            "retry. if it keeps happening, use following exception when asking on mailing list.",
-            ioe);
-        deleted = false;
       }
-      LOG.trace("Finish deleting " + type + " under " + dir + " deleted=" + deleted);
-      return deleted;
-    }
-
-    /**
-     * Get cleaner results of subdirs.
-     * @param tasks subdirs cleaner tasks
-     * @return true if all subdirs deleted successfully, false for patial/all failures
-     * @throws IOException something happen during computation
-     */
-    private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
-      boolean cleaned = true;
-      try {
-        for (CleanerTask task : tasks) {
-          cleaned &= task.get();
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        throw new IOException(e);
+      deleted = deletion.act();
+    } catch (PathIsNotEmptyDirectoryException exception) {
+      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
+      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
+      // message below.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.",
+          exception);
       }
-      return cleaned;
+      deleted = false;
+    } catch (IOException ioe) {
+      LOG.info(
+        "Could not delete " + type + " under " + dir + ". might be transient; we'll retry. if it "
+            + "keeps " + "happening, use following exception when asking on mailing list.",
+        ioe);
+      deleted = false;
+    } catch (Exception e) {
+      LOG.info("unexpected exception: ", e);
+      deleted = false;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Finish deleting " + type + " under " + dir + ", deleted=" + deleted);
     }
+    return deleted;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
index f201ae2..8684636 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 
@@ -33,7 +35,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 public class DirScanPool implements ConfigurationObserver {
   private static final Log LOG = LogFactory.getLog(DirScanPool.class);
   private volatile int size;
-  private ForkJoinPool pool;
+  private final ThreadPoolExecutor pool;
   private int cleanerLatch;
   private boolean reconfigNotification;
 
@@ -43,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver {
     // poolSize may be 0 or 0.0 from a careless configuration,
     // double check to make sure.
     size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
-    pool = new ForkJoinPool(size);
+    pool = initializePool(size);
     LOG.info("Cleaner pool size is " + size);
     cleanerLatch = 0;
   }
 
+  private static ThreadPoolExecutor initializePool(int size) {
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1L, TimeUnit.MINUTES,
+        new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("dir-scan-pool"));
+    executor.allowCoreThreadTimeOut(true);
+    return executor;
+  }
+
   /**
    * Checks if pool can be updated. If so, mark for update later.
    * @param conf configuration
@@ -74,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver {
     notifyAll();
   }
 
-  synchronized void execute(ForkJoinTask<?> task) {
-    pool.execute(task);
+  synchronized void execute(Runnable runnable) {
+    this.pool.execute(runnable);
   }
 
   public synchronized void shutdownNow() {
@@ -100,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver {
         break;
       }
     }
-    shutdownNow();
-    LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size);
-    pool = new ForkJoinPool(size);
+    LOG.info("Update chore's pool size from " + pool.getPoolSize() + " to " + size);
+    pool.setCorePoolSize(size);
   }
 
   public int getSize() {