You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2023/03/30 02:18:00 UTC

[jira] [Updated] (HUDI-4406) Support compaction commit write error resolvement to avoid data loss

     [ https://issues.apache.org/jira/browse/HUDI-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-4406:
--------------------------------------
    Fix Version/s: 0.12.3

> Support compaction commit write error resolvement to avoid data loss
> --------------------------------------------------------------------
>
>                 Key: HUDI-4406
>                 URL: https://issues.apache.org/jira/browse/HUDI-4406
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>    Affects Versions: 0.12.0
>            Reporter: Shizhi Chen
>            Assignee: Shizhi Chen
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.13.1, 0.12.3, 0.14.0
>
>
> Currently CompactionCommitSink commit or rollback logics doesn't take the writestatus error under consideration (only consider null writestatus), which actually will cause data loss when compacting the delta commit log files into the new versioned data files.
> eg. org.apache.hudi.io.HoodieMergeHandle#writeRecord will lead to data loss from log files due to Exceptions.
> {code:java}
> ```java
>   protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
>     Option recordMetadata = hoodieRecord.getData().getMetadata();
>     if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
>       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
>           + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
>       writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
>       return false;
>     }
>     try {
>       if (indexedRecord.isPresent() && !isDelete) {
>         writeToFile(hoodieRecord.getKey(), (GenericRecord) indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
>         recordsWritten++;
>       } else {
>         recordsDeleted++;
>       }
>       writeStatus.markSuccess(hoodieRecord, recordMetadata);
>       // deflate record payload after recording success. This will help users access payload as a
>       // part of marking
>       // record successful.
>       hoodieRecord.deflate();
>       return true;
>     } catch (Exception e) {
>       LOG.error("Error writing record  " + hoodieRecord, e);
>       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
>     }
>     return false;
>   }{code}
> And it's known that StreamWriteOperatorCoordinator has related commit or rollback handle process. 
> So this pr will:
> a)  Also add writestatus error as rollback reason for CompactionCommitSink compaction rollback to avoid data loss
> b) Unify the handle procedure for write commit policy with its implementions, as described in org.apache.hudi.commit.policy.WriteCommitPolicy, which is consolidated with that of StreamWriteOperatorCoordinator.
> c) All control whether data quality or ingestion stability should be in high priority through FlinkOptions#IGNORE_FAILED.
> And, we suggest that FlinkOptions#IGNORE_FAILED be in true by default to avoid data loss.
> d) Optimize and fix some tiny bugs for log traces when commiting on error or rolling back.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)