You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/13 12:44:00 UTC

[hudi] branch master updated: [HUDI-2421] Catch the throwable when scheduling the cleaning task for flink writer (#3650)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89651c9  [HUDI-2421] Catch the throwable when scheduling the cleaning task for flink writer (#3650)
89651c9 is described below

commit 89651c94085f3f775328e5fbc2113aa9d1a6a962
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Mon Sep 13 20:43:44 2021 +0800

    [HUDI-2421] Catch the throwable when scheduling the cleaning task for flink writer (#3650)
---
 hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 1ca593f..e75fad5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -81,8 +81,13 @@ public class CleanFunction<T> extends AbstractRichFunction
   @Override
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
     if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
-      this.writeClient.startAsyncCleaning();
-      this.isCleaning = true;
+      try {
+        this.writeClient.startAsyncCleaning();
+        this.isCleaning = true;
+      } catch (Throwable throwable) {
+        // catch the exception to not affect the normal checkpointing
+        LOG.warn("Error while start async cleaning", throwable);
+      }
     }
   }