You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/14 10:15:28 UTC
[hudi] branch master updated: [HUDI-4406] Support Flink compaction/clustering write error resolvement to avoid data loss (#6121)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ed6b7f6aedc [HUDI-4406] Support Flink compaction/clustering write error resolvement to avoid data loss (#6121)
ed6b7f6aedc is described below
commit ed6b7f6aedc2cba0f753a4ee130cef860ecb0801
Author: Chenshizhi <10...@users.noreply.github.com>
AuthorDate: Tue Feb 14 18:15:18 2023 +0800
[HUDI-4406] Support Flink compaction/clustering write error resolvement to avoid data loss (#6121)
---
.../main/java/org/apache/hudi/configuration/FlinkOptions.java | 6 +++---
.../org/apache/hudi/sink/clustering/ClusteringCommitSink.java | 11 +++++++++++
.../org/apache/hudi/sink/compact/CompactionCommitSink.java | 11 +++++++++++
3 files changed, 25 insertions(+), 3 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e447692fc98..9cdeb963d53 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -382,9 +382,9 @@ public class FlinkOptions extends HoodieConfig {
.key("write.ignore.failed")
.booleanType()
.defaultValue(false)
- .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
- + "By default false. Turning this on, could hide the write status errors while the spark checkpoint moves ahead. \n"
- + " So, would recommend users to use this with caution.");
+ .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. \n"
+ + "By default false. Turning this on, could hide the write status errors while the flink checkpoint moves ahead. \n"
+ + "So, would recommend users to use this with caution.");
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index eb567d89f18..3f392de1527 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -147,6 +147,17 @@ public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
.flatMap(Collection::stream)
.collect(Collectors.toList());
+ long numErrorRecords = statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+ if (numErrorRecords > 0 && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+ // handle failure case
+ LOG.error("Got {} error records during clustering of instant {},\n"
+ + "option '{}' is configured as false,"
+ + "rolls back the clustering", numErrorRecords, instant, FlinkOptions.IGNORE_FAILED.key());
+ ClusteringUtil.rollbackClustering(table, writeClient, instant);
+ return;
+ }
+
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(statuses);
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 1e05dce6076..0e9bc54f8fb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -157,6 +157,17 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
.flatMap(Collection::stream)
.collect(Collectors.toList());
+ long numErrorRecords = statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+ if (numErrorRecords > 0 && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+ // handle failure case
+ LOG.error("Got {} error records during compaction of instant {},\n"
+ + "option '{}' is configured as false,"
+ + "rolls back the compaction", numErrorRecords, instant, FlinkOptions.IGNORE_FAILED.key());
+ CompactionUtil.rollbackCompaction(table, instant);
+ return;
+ }
+
HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
table, instant, HoodieListData.eager(statuses), writeClient.getConfig().getSchema());