You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/05/29 05:35:53 UTC

[hudi] branch master updated: [HUDI-4086] Use CustomizedThreadFactory in async compaction and clustering (#5563)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e86884604 [HUDI-4086] Use CustomizedThreadFactory in async compaction and clustering (#5563)
7e86884604 is described below

commit 7e86884604057c0a7a373d4e84d8f3e112170ca7
Author: 苏承祥 <sc...@aliyun.com>
AuthorDate: Sun May 29 13:35:47 2022 +0800

    [HUDI-4086] Use CustomizedThreadFactory in async compaction and clustering (#5563)
    
    Co-authored-by: 苏承祥 <su...@tuya.com>
---
 .../org/apache/hudi/async/AsyncClusteringService.java   | 13 ++++---------
 .../java/org/apache/hudi/async/AsyncCompactService.java | 17 ++++++-----------
 2 files changed, 10 insertions(+), 20 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
index 7fece5c885..1e4d4d1f59 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -42,13 +43,12 @@ import java.util.stream.IntStream;
  */
 public abstract class AsyncClusteringService extends HoodieAsyncTableService {
 
+  public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
-  public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
-
   private final int maxConcurrentClustering;
-  private transient BaseClusterer clusteringClient;
   protected transient HoodieEngineContext context;
+  private transient BaseClusterer clusteringClient;
 
   public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
     this(context, writeClient, false);
@@ -69,12 +69,7 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService {
   @Override
   protected Pair<CompletableFuture, ExecutorService> startService() {
     ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering,
-        r -> {
-          Thread t = new Thread(r, "async_clustering_thread");
-          t.setDaemon(isRunInDaemonMode());
-          return t;
-        });
-
+        new CustomizedThreadFactory("async_clustering_thread", isRunInDaemonMode()));
     return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
       try {
         // Set Compactor Pool Name for allowing users to prioritize compaction
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
index f1f7f416e4..a62beae02b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -39,17 +40,15 @@ import java.util.stream.IntStream;
  */
 public abstract class AsyncCompactService extends HoodieAsyncTableService {
 
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
-
   /**
    * This is the job pool used by async compaction.
    */
   public static final String COMPACT_POOL_NAME = "hoodiecompact";
-
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
   private final int maxConcurrentCompaction;
-  private transient BaseCompactor compactor;
   protected transient HoodieEngineContext context;
+  private transient BaseCompactor compactor;
 
   public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
     this(context, client, false);
@@ -70,11 +69,7 @@ public abstract class AsyncCompactService extends HoodieAsyncTableService {
   @Override
   protected Pair<CompletableFuture, ExecutorService> startService() {
     ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
-        r -> {
-        Thread t = new Thread(r, "async_compact_thread");
-        t.setDaemon(isRunInDaemonMode());
-        return t;
-      });
+        new CustomizedThreadFactory("async_compact_thread", isRunInDaemonMode()));
     return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
       try {
         // Set Compactor Pool Name for allowing users to prioritize compaction
@@ -107,9 +102,9 @@ public abstract class AsyncCompactService extends HoodieAsyncTableService {
     }, executor)).toArray(CompletableFuture[]::new)), executor);
   }
 
-
   /**
    * Check whether compactor thread needs to be stopped.
+   *
    * @return
    */
   protected boolean shouldStopCompactor() {