You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Raymond Xu (Jira)" <ji...@apache.org> on 2022/09/01 01:24:00 UTC

[jira] [Commented] (HUDI-4741) Deadlock when restarting failed TM in AbstractStreamWriteFunction

    [ https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598686#comment-17598686 ] 

Raymond Xu commented on HUDI-4741:
----------------------------------

[~danny0405] [~yuzhaojing] any of you want to take this?

> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
>                 Key: HUDI-4741
>                 URL: https://issues.apache.org/jira/browse/HUDI-4741
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: voon
>            Priority: Major
>
> h1. Summary of Events
>  # TM heartbeat not sent to JM (Can be triggered by killing a container), JM kills the TM/container
>  # JM restarts the container, but the restarting code is not handled properly, causing there to be a deadlock
>  # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink checkpoint timeout), causing a instant initialization timeout error
>  # JM is restarted
>  # JM restore state from previously successful checkpoint
>  # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
>  # Synchronisation issue occurs if TM executes *`lastPendingInstant()`* before JM executes *`startInstant()`*
>  # Single commit multi instant issue occurs
>  # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start a new write cycle with the correct _INFLIGHT_ instant while a checkpoint is being performed.
>  # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are writing to with the correct _INFLIGHT_ instant in the next cycle, causing FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.
>  
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
>     `val`               STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.val.length' = '99999',
>     'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
>     `val`                 STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
>     ,`dt`               STRING
>     ,`hh`               STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
>     'connector' = 'hudi',
>     'path' = 'hdfs://jm_tm_sync_error/',
>     'table.type' = 'COPY_ON_WRITE',
>     'write.operation' = 'insert',
>     'hoodie.parquet.small.file.limit' = '104857600',
>     'hoodie.parquet.max.file.size' = '268435456',
>     'hoodie.datasource.write.recordkey.field' = 'partition,offset',
>     'hoodie.datasource.write.hive_style_partitioning' = 'true',
>     'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
>     'write.bulk_insert.sort_input' = 'false',
>     'index.bootstrap.enabled' = 'false',
>     'index.state.ttl' = '60',
>     'index.type' = 'FLINK_STATE',
>     'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
>     'write.tasks' = '8',
>     'hive_sync.enable' = 'false'
> );insert into test_hudi
> select  `val`
>         ,`event_time`
>         ,`partition`
>         ,`offset`
>         ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
>         ,DATE_FORMAT(event_time, 'HH')
>  from input_table; {code}
>  
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>  
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>  
>  
> h1. Issues
> There are 2 main issues here:
>  # *TM failing + starting a TM in a new container causing deadlock*
>  # *Single commit multi-instant causing various base file parquet errors*
>  ** java.io.FileNotFoundException: File does not exist: 20220712165157207.parquet (inode 1234567890) Holder DFSClient_NONMAPREDUCE_1111111111_22 does not have any open files.
>  ** java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
>  ** java.lang.RuntimeException: 20220805210423582.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [1, 0, -38, 2] 
>  
> h1. TM failing + starting a TM in a new container causing deadlock 
>  # When a TM fails + starting and restoring a TM in a new container creates a deadlock situation
>  ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
>  ** JM is waiting for TM to send a success WriteMetadataEvent
>  # The deadlock above will cause either of the errors below:
>  ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize
>  ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
>  # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
>  # JM will try to restore itself from the last successful checkpoint
>  # This will cause the next issue (illustrated below)
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will attempt to restore the state of the TM. At this stage, *`this.currentInstant`* will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded and a _INFLIGHT_ instant is returned.
>  
> When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`* will always be true as the local *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in an infinite loop as {*}`lastPendingInstant()`{*}, which governs both *`instant`* and *`this.currentInstant`* will always return the same value as the state of the ckp metadata path is never changed. 
>  
> This is so as JM is waiting for the TM to finish writing for the batch for the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new _INFLIGHT_ instant, hence the deadlock. 
>  
> The fix is to add additional logic to handle such a case to ensure that a TM can obtain a correct _INFLIGHT_ instant when being recovering.
>  
> h1. Single commit multi-instant
>  # JM restore state from previously successful checkpoint
>  # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
>  # Synchronisation issue occurs if TM executes *`lastPendingInstant()`* before JM executes *`startInstant()`*
>  # Single commit multi instant issue occurs
>  # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start a new write cycle with the correct _INFLIGHT_ instant while a checkpoint is being performed.
>  # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are writing to with the correct _INFLIGHT_ instant in the next cycle, causing FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.
>  
> This is caused by the synchronisation issue due to Task Manager (TM) running *`ckpMetaData#lastPendingInstant()`* before the Job Manager (JM) executes {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, causing the TM to fetch an old instantTime. 
>  
> Since _ABORTED_ statuses are not written to the metadata path, TM will fetch a previously aborted instant, thinking that it is still {_}INFLIGHT{_}.
>  
> Hence, the new commit will have files of 2 instants should there be a restart AND the timeline contains a _INCOMPLETE_ instant ({_}ABORTED{_}/{_}INFLIGHT{_}).
>  
> Please refer to the example below if a RESTART + job restore is triggered, causing a tainted ckp metadata path to be produced. A tainted ckp metadata path is a path in which there are more than 1 INFLIGHT files.
>  
> h2. CORRECT Instant fetched after JM restores from checkpoint
> After the job restarts, commit *`20220828165937711`* is created, and has successfully completed.
>  
> If TM invokes *`ckpMetaData#lastPendingInstant()`* AFTER JM runs {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the NEW instant on the .aux hdfs path.
>  
> *`StreamWriteOperatorCoordinator#startInstant()`* will create the instant {*}`20220828170053510`{*}, causing there to be 2 _INFLIGHT_ commits. Since the most recent _INCOMPLETE_ instant is fetched, the correct instant, which is *`20220828170053510`* will be returned.
>  
> The .aux hdfs path will look like this when *`ckpMetaData#lastPendingInstant()`* is invoked.
>  
> {code:java}
> [
>   Ckp{instant='20220828164426755', state='INFLIGHT'}, 
>   Ckp{instant='20220828165937711', state='COMPLETED'}, 
>   Ckp{instant='20220828170053510', state='INFLIGHT'}
> ] {code}
>  
>  
> h2. INCORRECT Instant fetched after JM restores from checkpoint
> If the TM invokes *`ckpMetaData#lastPendingInstant()`* BEFORE JM runs {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the OLD instant on the .aux hdfs path.
> The .aux hdfs path will look like this when *`ckpMetaData#lastPendingInstant()`* is invoked.
>  
> {code:java}
> [
>   Ckp{instant='20220828164123688', state='COMPLETED'}, 
>   Ckp{instant='20220828164426755', state='INFLIGHT'}, 
>   Ckp{instant='20220828165937711', state='COMPLETED'}
> ] {code}
>  
> Since the new instant file has not been created yet, the most _INCOMPLETE_ is fetched, which is {*}`20220828164426755`{*}.
> In such a case, when there is a possibility that different TMs can obtain different instants time from {*}`ckpMetaData#lastPendingInstant()`{*}, a single commit might contain multiple instants.
>  
> h2. FileNotFoundException and various parquet corruption errors
> Building upon the example in {_}Single commit multiple instant error{_}, when *`AppendWriteFunction#flushData()`* is invoked, the current *`writerHelper`* will be cleaned up by closing all existing file handles. The *`writerHelper`* will then be set to {_}NULL{_}. 
> At this point, Hudi is performing a checkpoint and is about to write to Hudi's timeline by creating {*}`{*}.commit`* file with the commit's metadata.
> Suppose that a TM (Let's call this the {*}tainted TM{*}) is using the an old instant in the tainted ckp metadata path as such: ({*}`20220828164426755`{*} will be used)
> {code:java}
> [
>   Ckp{instant='20220828164123688', state='COMPLETED'}, 
>   Ckp{instant='20220828164426755', state='INFLIGHT'}, 
>   Ckp{instant='20220828165937711', state='COMPLETED'},
>   Ckp{instant='20220828170053510', state='INFLIGHT'}
> ] {code}
>  
> Once *`AppendWriteFunction#flushData()`* has completed, on the next invocation of {*}`AppendWriteFunction#processElement()`{*}, *`AppendWriteFunction#initWriterHelper()`* will be invoked, causing a new writerHelper with the instant *`20220828170053510`* to be created, and writing will begin to files of this instant.
>  
> While all this is happening, Flink is performing a checkpoint and is writing to Hudi's timeline by creating a {*}`{*}.commit`* file with the commit's metadata.
> Before it can create and write to the {*}`{*}.commit`{*}, *`HoodieTable#finalizeWrite()`{*} will be invoked. 
>  
> Subsequently, *`HoodieTable#reconcileAgainstMarkers()`* is invoked. What this method does is to cleanup partially written data-files due to failed (but succeeded on retry) tasks. (SUPPOSEDLY)
> Since the *tainted TM* is using the instant that is being committed to the Hudi timeline, the files that it are currently being written to will be cleaned up as the marker files can be found, but data files are not found in the {*}`HoodieWriteStat`{*}. 
>  
> In essence, the *tainted TM* is starting a write cycle when it should not be doing so with an instant that is being committed Hudi timeline, hence, causing files to be deleted/corrupted/being in the wrong state.
>  
> Hence, when the JM is performing a checkpoint + commit, the *tainted TM* might try to close the file handle if it has reached the {*}`parquet.max.filesize`{*}. When trying to close the file handle, this file might have already been deleted by {*}`HoodieTable#reconcileAgainstMarkers()`{*}, causing the {*}`java.io.FileNotFoundException`{*}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)