You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2023/03/30 02:23:00 UTC

[jira] [Updated] (HUDI-5822) FileID not found when recovering from a failover for Flink write jobs with bucket index

     [ https://issues.apache.org/jira/browse/HUDI-5822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-5822:
--------------------------------------
    Fix Version/s: 0.12.3

> FileID not found when recovering from a failover for Flink write jobs with bucket index
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-5822
>                 URL: https://issues.apache.org/jira/browse/HUDI-5822
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.13.1, 0.12.3, 0.14.0
>
>
> Applying the fix from [https://github.com/apache/hudi/pull/5185] will fix write issues for MOR tables, but will cause write issues for COW tables.
> h2. Reproducing the error for COW tables
> The stacktrace below can be triggered under very specific scenarios.
> {code:java}
> Caused by: java.util.NoSuchElementException: FileID 00000000-ee86-4b41-a704-9e075dd253d8 of partition path age=1 does not exist.
>     at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:157) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:122) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.io.FlinkWriteHandleFactory$CommitWriteHandleFactory.createMergeHandle(FlinkWriteHandleFactory.java:173) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:115) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:571) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:88) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.<init>(HoodieFlinkWriteClient.java:620) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:152) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$2(StreamWriteFunction.java:205) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$8(StreamWriteFunction.java:492) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[?:1.8.0_232]
>     at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:484) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:152) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.snapshotState(BucketStreamWriteFunction.java:103) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:168) ~[hudi-flink1.13-bundle-0.12.1.jar:0.12.1]
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:776) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:697) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:660) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:344) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1100) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1084) ~[flink-dist_2.11-1.13.22.jar:1.13.22]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1040) ~[flink-dist_2.11-1.13.22.jar:1.13.22]{code}
>  
> The preconditions for it to happen is:
>  # A snapshot state has to fail midway while writing/flushing data to a partition
>  # A new fileGroup/fileId must be written to the partition (this fileGroup/fileId must not exist before the ongoing snapshot state that is failing)
>  # The new fileGroup will be rolled back (with marker file) and hence purged from the partition
>  # JM must execute rollback ({*}StreamWriteOperatorCoordinator#initInstant{*}) after {*}BucketStreamWriteFunction#processElement{*}{*}{}{*}
>  
> In other words, you can reproduce this error by following this steps:
>  # Create a write job with bucket-index for a COW table
>  # When it's performing the first checkpoint/writing to a new partition, kill the TM
>  # Stop the job manually
>  # Restart the job
>  # If the JM performs a rollback after a TM executes processElement -> bootstrapIfNeed, the error described above will present itself
>  
> JM Performing rollback for a snapshot state that has failed midway through.
> {code:java}
> 2023-02-20 11:36:34,826 INFO  org.apache.hudi.client.BaseHoodieWriteClient                 [] - Begin rollback of instant 20230220112929727
> 2023-02-20 11:36:34,833 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Loading HoodieTableMetaClient from hdfs://hudi_path
> 2023-02-20 11:36:34,947 INFO  org.apache.hudi.common.table.HoodieTableConfig               [] - Loading table properties from hdfs://hudi_path/.hoodie/hoodie.properties
> 2023-02-20 11:36:34,952 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from hdfs://hudi_path
> 2023-02-20 11:36:34,952 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Loading Active commit timeline for hdfs://hudi_path
> 2023-02-20 11:36:35,320 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[==>20230220112929727__commit__INFLIGHT]}
> 2023-02-20 11:36:35,321 INFO  org.apache.hudi.common.table.view.FileSystemViewManager      [] - Creating View Manager with storage type :REMOTE_FIRST
> 2023-02-20 11:36:35,321 INFO  org.apache.hudi.common.table.view.FileSystemViewManager      [] - Creating remote first table view
> 2023-02-20 11:36:35,323 INFO  org.apache.hudi.client.BaseHoodieWriteClient                 [] - Scheduling Rollback at instant time : 20230220113634829 (exists in active timeline: true), with rollback plan: false
> 2023-02-20 11:36:35,612 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[==>20230220113634829__rollback__REQUESTED]}
> 2023-02-20 11:36:35,612 INFO  org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor [] - Requesting Rollback with instant time [==>20230220113634829__rollback__REQUESTED]
> 2023-02-20 11:36:35,620 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[==>20230220113634829__rollback__REQUESTED]}
> 2023-02-20 11:36:35,694 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Checking for file exists ?hdfs://hudi_path/.hoodie/20230220113634829.rollback.requested
> 2023-02-20 11:36:35,706 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Create new file for toInstant ?hdfs://hudi_path/.hoodie/20230220113634829.rollback.inflight
> 2023-02-20 11:36:35,709 INFO  org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor [] - Clean out all base files generated for commit: [==>20230220112929727__commit__INFLIGHT]
> 2023-02-20 11:36:35,720 INFO  org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor [] - Time(in ms) taken to finish rollback 11
> 2023-02-20 11:36:35,720 INFO  org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor [] - Rolled back inflight instant 20230220112929727
> 2023-02-20 11:36:35,721 INFO  org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor [] - Index rolled back for commits [==>20230220112929727__commit__INFLIGHT]
> 2023-02-20 11:36:35,725 INFO  org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor [] - Deleting instant=[==>20230220112929727__commit__INFLIGHT]
> 2023-02-20 11:36:35,725 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Deleting instant [==>20230220112929727__commit__INFLIGHT]
> 2023-02-20 11:36:35,728 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Removed instant [==>20230220112929727__commit__INFLIGHT]
> 2023-02-20 11:36:35,728 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Deleting instant [==>20230220112929727__commit__REQUESTED]
> 2023-02-20 11:36:35,731 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Removed instant [==>20230220112929727__commit__REQUESTED]
> 2023-02-20 11:36:35,732 INFO  org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor [] - Deleted pending commit [==>20230220112929727__commit__REQUESTED]
> 2023-02-20 11:36:35,733 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Checking for file exists ?hdfs://hudi_path/.hoodie/20230220113634829.rollback.inflight
> 2023-02-20 11:36:35,786 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Create new file for toInstant ?hdfs://hudi_path/.hoodie/20230220113634829.rollback
> 2023-02-20 11:36:35,786 INFO  org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor [] - Rollback of Commits [20230220112929727] is complete
> 2023-02-20 11:36:35,805 INFO  org.apache.hudi.common.fs.FSUtils                            [] - Removed directory at hdfs://hudi_path/.hoodie/.temp/20230220112929727
> 2023-02-20 11:36:35,806 INFO  org.apache.hudi.metrics.HoodieMetrics                        [] - Sending rollback metrics (duration=973, numFilesDeleted=2)
> 2023-02-20 11:36:35,812 INFO  org.apache.hudi.client.BaseHoodieWriteClient                 [] - Generate a new instant time: 20230220113635812 action: commit
> 2023-02-20 11:36:35,815 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[20230220113634829__rollback__COMPLETED]}{code}
>  
> TM executing process element before rollback is complete
>  
> {code:java}
>  11:36:33,837 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from hdfs://hdfs_path
> 2023-02-20 11:36:33,837 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Loading Active commit timeline for hdfs://hdfs_path
> 2023-02-20 11:36:33,840 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[==>20230220112929727__commit__INFLIGHT]}
> 2023-02-20 11:36:33,841 INFO  org.apache.hudi.client.BaseHoodieClient                      [] - Embedded Timeline Server is disabled. Not starting timeline service
> 2023-02-20 11:36:33,843 INFO  org.apache.hudi.common.table.view.FileSystemViewManager      [] - Creating View Manager with storage type :REMOTE_FIRST
> 2023-02-20 11:36:33,843 INFO  org.apache.hudi.common.table.view.FileSystemViewManager      [] - Creating remote first table view
> 2023-02-20 11:36:33,849 INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView [] - Took 2 ms to read  0 instants, 0 replaced file groups
> 2023-02-20 11:36:33,850 INFO  org.apache.hudi.sink.common.AbstractStreamWriteFunction      [] - Send bootstrap write metadata event to coordinator, task[0].
> 2023-02-20 11:36:33,850 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[==>20230220112929727__commit__INFLIGHT]}
> 2023-02-20 11:36:33,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - bucket_write: dim_buyer_info_test (1/2)#0 (e2e7a38d69393b8d814ad48544573435_829468138015e9cb689e833f1800885e_0_0) switched from INITIALIZING to RUNNING.
> 2023-02-20 11:36:33,857 INFO  org.apache.hudi.sink.CleanFunction                           [] - Executor executes action [wait for cleaning finish] success!
> 2023-02-20 11:36:33,860 INFO  org.apache.hudi.sink.bucket.BucketStreamWriteFunction        [] - Loading Hoodie Table dim_buyer_info_test, with path hdfs://hdfs_path/age=0
> 2023-02-20 11:36:33,860 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Loading HoodieTableMetaClient from hdfs://hdfs_path
> 2023-02-20 11:36:33,867 INFO  org.apache.hudi.common.table.HoodieTableConfig               [] - Loading table properties from hdfs://hdfs_path/.hoodie/hoodie.properties
> 2023-02-20 11:36:33,868 INFO  org.apache.hudi.common.util.ClusteringUtils                  [] - Found 0 files in pending clustering operations
> 2023-02-20 11:36:33,868 INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView [] - Building file system view for partition (age=1)
> 2023-02-20 11:36:33,872 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from hdfs://hdfs_path
> 2023-02-20 11:36:33,872 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Loading Active commit timeline for hdfs://hdfs_path
> 2023-02-20 11:36:33,878 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[==>20230220112929727__commit__INFLIGHT]}
> 2023-02-20 11:36:33,879 INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView [] - addFilesToView: NumFiles=2, NumFileGroups=1, FileGroupsCreationTime=6, StoreTimeTaken=1
> 2023-02-20 11:36:33,880 INFO  org.apache.hudi.common.table.view.FileSystemViewManager      [] - Creating View Manager with storage type :REMOTE_FIRST
> 2023-02-20 11:36:33,880 INFO  org.apache.hudi.common.table.view.FileSystemViewManager      [] - Creating remote first table view
> 2023-02-20 11:36:33,880 INFO  org.apache.hudi.sink.bucket.BucketStreamWriteFunction        [] - bootstrapIndexIfNeed with timeline: [[==>20230220112929727__commit__INFLIGHT]]
> 2023-02-20 11:36:33,880 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Loading HoodieTableMetaClient from hdfs://hdfs_path
> 2023-02-20 11:36:33,880 INFO  org.apache.hudi.sink.bucket.BucketStreamWriteFunction        [] - Should load this partition bucket 0 with fileID 00000000-ee86-4b41-a704-9e075dd253d8
> 2023-02-20 11:36:33,880 INFO  org.apache.hudi.sink.bucket.BucketStreamWriteFunction        [] - Adding fileID 00000000-ee86-4b41-a704-9e075dd253d8 to the bucket 0 of partition age=1. {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)