You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/07 18:06:25 UTC

[hudi] 03/21: Revert "[HUDI-3043] Revert async cleaner leak commit to unblock CI failure (#4343)" (#4465)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d91bb0e7eb94a416aae1f9d236147c776354d8d0
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Wed Dec 29 21:45:09 2021 -0500

    Revert "[HUDI-3043] Revert async cleaner leak commit to unblock CI failure (#4343)" (#4465)
    
    This reverts commit 7e7ad1558c0dcc06e059f631e43e44dc04100aa4.
---
 .../org/apache/hudi/async/HoodieAsyncService.java  | 36 +++++-----------------
 .../hudi/client/AbstractHoodieWriteClient.java     |  6 +++-
 .../apache/hudi/client/AsyncCleanerService.java    | 14 ++++-----
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  6 +++-
 .../java/org/apache/hudi/sink/CleanFunction.java   |  7 +++++
 5 files changed, 31 insertions(+), 38 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index 85e0081..f57484d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -130,7 +129,7 @@ public abstract class HoodieAsyncService implements Serializable {
     future = res.getKey();
     executor = res.getValue();
     started = true;
-    monitorThreads(onShutdownCallback);
+    shutdownCallback(onShutdownCallback);
   }
 
   /**
@@ -141,34 +140,15 @@ public abstract class HoodieAsyncService implements Serializable {
   protected abstract Pair<CompletableFuture, ExecutorService> startService();
 
   /**
-   * A monitor thread is started which would trigger a callback if the service is shutdown.
+   * Add shutdown callback for the completable future.
    * 
-   * @param onShutdownCallback
+   * @param callback The callback
    */
-  private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
-    LOG.info("Submitting monitor thread !!");
-    Executors.newSingleThreadExecutor(r -> {
-      Thread t = new Thread(r, "Monitor Thread");
-      t.setDaemon(isRunInDaemonMode());
-      return t;
-    }).submit(() -> {
-      boolean error = false;
-      try {
-        LOG.info("Monitoring thread(s) !!");
-        future.get();
-      } catch (ExecutionException ex) {
-        LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
-        error = true;
-      } catch (InterruptedException ie) {
-        LOG.error("Got interrupted Monitoring threads", ie);
-        error = true;
-      } finally {
-        // Mark as shutdown
-        shutdown = true;
-        if (null != onShutdownCallback) {
-          onShutdownCallback.apply(error);
-        }
-        shutdown(false);
+  @SuppressWarnings("unchecked")
+  private void shutdownCallback(Function<Boolean, Boolean> callback) {
+    future.whenComplete((resp, error) -> {
+      if (null != callback) {
+        callback.apply(null != error);
       }
     });
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 396023a..76b10fd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -425,7 +425,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
       HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
     this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+    if (null == this.asyncCleanerService) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index 2fd4251..a5a38f2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService {
   private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
 
   private final AbstractHoodieWriteClient writeClient;
-  private final String cleanInstantTime;
   private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
 
-  protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) {
+  protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
     this.writeClient = writeClient;
-    this.cleanInstantTime = cleanInstantTime;
   }
 
   @Override
   protected Pair<CompletableFuture, ExecutorService> startService() {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
     return Pair.of(CompletableFuture.supplyAsync(() -> {
-      writeClient.clean(cleanInstantTime);
+      writeClient.clean(instantTime);
       return true;
-    }), executor);
+    }, executor), executor);
   }
 
   public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
     AsyncCleanerService asyncCleanerService = null;
     if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
-      String instantTime = HoodieActiveTimeline.createNewInstantTime();
-      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
-      asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
+      asyncCleanerService = new AsyncCleanerService(writeClient);
       asyncCleanerService.start(null);
     } else {
       LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 2ed2536..4108ba4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -281,7 +281,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
    * checkpoint finish.
    */
   public void startAsyncCleaning() {
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
   }
 
   /**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 13154b2..195e430 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -98,4 +98,11 @@ public class CleanFunction<T> extends AbstractRichFunction
   public void initializeState(FunctionInitializationContext context) throws Exception {
     // no operation
   }
+
+  @Override
+  public void close() throws Exception {
+    if (this.writeClient != null) {
+      this.writeClient.close();
+    }
+  }
 }