You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/14 10:15:28 UTC

[hudi] branch master updated: [HUDI-4406] Support Flink compaction/clustering write error resolvement to avoid data loss (#6121)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed6b7f6aedc [HUDI-4406] Support Flink compaction/clustering write error resolvement to avoid data loss (#6121)
ed6b7f6aedc is described below

commit ed6b7f6aedc2cba0f753a4ee130cef860ecb0801
Author: Chenshizhi <10...@users.noreply.github.com>
AuthorDate: Tue Feb 14 18:15:18 2023 +0800

    [HUDI-4406] Support Flink compaction/clustering write error resolvement to avoid data loss (#6121)
---
 .../main/java/org/apache/hudi/configuration/FlinkOptions.java |  6 +++---
 .../org/apache/hudi/sink/clustering/ClusteringCommitSink.java | 11 +++++++++++
 .../org/apache/hudi/sink/compact/CompactionCommitSink.java    | 11 +++++++++++
 3 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e447692fc98..9cdeb963d53 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -382,9 +382,9 @@ public class FlinkOptions extends HoodieConfig {
       .key("write.ignore.failed")
       .booleanType()
       .defaultValue(false)
-      .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
-          + "By default false.  Turning this on, could hide the write status errors while the spark checkpoint moves ahead. \n"
-          + "  So, would recommend users to use this with caution.");
+      .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. \n"
+          + "By default false. Turning this on, could hide the write status errors while the flink checkpoint moves ahead. \n"
+          + "So, would recommend users to use this with caution.");
 
   public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
       .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index eb567d89f18..3f392de1527 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -147,6 +147,17 @@ public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
+    long numErrorRecords = statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+    if (numErrorRecords > 0 && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+      // handle failure case
+      LOG.error("Got {} error records during clustering of instant {},\n"
+          + "option '{}' is configured as false,"
+          + "rolls back the clustering", numErrorRecords, instant, FlinkOptions.IGNORE_FAILED.key());
+      ClusteringUtil.rollbackClustering(table, writeClient, instant);
+      return;
+    }
+
     HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
     writeMetadata.setWriteStatuses(statuses);
     writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 1e05dce6076..0e9bc54f8fb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -157,6 +157,17 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
+    long numErrorRecords = statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+    if (numErrorRecords > 0 && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+      // handle failure case
+      LOG.error("Got {} error records during compaction of instant {},\n"
+          + "option '{}' is configured as false,"
+          + "rolls back the compaction", numErrorRecords, instant, FlinkOptions.IGNORE_FAILED.key());
+      CompactionUtil.rollbackCompaction(table, instant);
+      return;
+    }
+
     HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
         table, instant, HoodieListData.eager(statuses), writeClient.getConfig().getSchema());