You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/07 18:06:25 UTC
[hudi] 03/21: Revert "[HUDI-3043] Revert async cleaner leak commit to unblock CI failure (#4343)" (#4465)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d91bb0e7eb94a416aae1f9d236147c776354d8d0
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Wed Dec 29 21:45:09 2021 -0500
Revert "[HUDI-3043] Revert async cleaner leak commit to unblock CI failure (#4343)" (#4465)
This reverts commit 7e7ad1558c0dcc06e059f631e43e44dc04100aa4.
---
.../org/apache/hudi/async/HoodieAsyncService.java | 36 +++++-----------------
.../hudi/client/AbstractHoodieWriteClient.java | 6 +++-
.../apache/hudi/client/AsyncCleanerService.java | 14 ++++-----
.../apache/hudi/client/HoodieFlinkWriteClient.java | 6 +++-
.../java/org/apache/hudi/sink/CleanFunction.java | 7 +++++
5 files changed, 31 insertions(+), 38 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index 85e0081..f57484d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -130,7 +129,7 @@ public abstract class HoodieAsyncService implements Serializable {
future = res.getKey();
executor = res.getValue();
started = true;
- monitorThreads(onShutdownCallback);
+ shutdownCallback(onShutdownCallback);
}
/**
@@ -141,34 +140,15 @@ public abstract class HoodieAsyncService implements Serializable {
protected abstract Pair<CompletableFuture, ExecutorService> startService();
/**
- * A monitor thread is started which would trigger a callback if the service is shutdown.
+ * Add shutdown callback for the completable future.
*
- * @param onShutdownCallback
+ * @param callback The callback
*/
- private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
- LOG.info("Submitting monitor thread !!");
- Executors.newSingleThreadExecutor(r -> {
- Thread t = new Thread(r, "Monitor Thread");
- t.setDaemon(isRunInDaemonMode());
- return t;
- }).submit(() -> {
- boolean error = false;
- try {
- LOG.info("Monitoring thread(s) !!");
- future.get();
- } catch (ExecutionException ex) {
- LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
- error = true;
- } catch (InterruptedException ie) {
- LOG.error("Got interrupted Monitoring threads", ie);
- error = true;
- } finally {
- // Mark as shutdown
- shutdown = true;
- if (null != onShutdownCallback) {
- onShutdownCallback.apply(error);
- }
- shutdown(false);
+ @SuppressWarnings("unchecked")
+ private void shutdownCallback(Function<Boolean, Boolean> callback) {
+ future.whenComplete((resp, error) -> {
+ if (null != callback) {
+ callback.apply(null != error);
}
});
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 396023a..76b10fd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -425,7 +425,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ if (null == this.asyncCleanerService) {
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ } else {
+ this.asyncCleanerService.start(null);
+ }
}
/**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index 2fd4251..a5a38f2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
private final AbstractHoodieWriteClient writeClient;
- private final String cleanInstantTime;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
- protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) {
+ protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
this.writeClient = writeClient;
- this.cleanInstantTime = cleanInstantTime;
}
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
return Pair.of(CompletableFuture.supplyAsync(() -> {
- writeClient.clean(cleanInstantTime);
+ writeClient.clean(instantTime);
return true;
- }), executor);
+ }, executor), executor);
}
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
- String instantTime = HoodieActiveTimeline.createNewInstantTime();
- LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
- asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
+ asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
} else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 2ed2536..4108ba4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -281,7 +281,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* checkpoint finish.
*/
public void startAsyncCleaning() {
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ if (this.asyncCleanerService == null) {
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ } else {
+ this.asyncCleanerService.start(null);
+ }
}
/**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 13154b2..195e430 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -98,4 +98,11 @@ public class CleanFunction<T> extends AbstractRichFunction
public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation
}
+
+ @Override
+ public void close() throws Exception {
+ if (this.writeClient != null) {
+ this.writeClient.close();
+ }
+ }
}