You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Danny Chen (Jira)" <ji...@apache.org> on 2022/10/30 00:23:00 UTC

[jira] [Resolved] (HUDI-4741) Deadlock when restarting failed TM in AbstractStreamWriteFunction

     [ https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Danny Chen resolved HUDI-4741.
------------------------------

> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
>                 Key: HUDI-4741
>                 URL: https://issues.apache.org/jira/browse/HUDI-4741
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.13.0
>
>         Attachments: coordinator_operator_review-old operator.png
>
>
> h1. Summary of Events
>  # TM heartbeat not sent to JM (Can be triggered by killing a container), JM kills the TM/container
>  # JM restarts the container, but the restarting code is not handled properly, causing there to be a deadlock
>  # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink checkpoint timeout), causing a instant initialization timeout error
>  # JM is restarted
>  # JM restore state from previously successful checkpoint
>  # Issue in HUDI-4907 occurs
>  
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
>     `val`               STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.val.length' = '99999',
>     'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
>     `val`                 STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
>     ,`dt`               STRING
>     ,`hh`               STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
>     'connector' = 'hudi',
>     'path' = 'hdfs://jm_tm_sync_error/',
>     'table.type' = 'COPY_ON_WRITE',
>     'write.operation' = 'insert',
>     'hoodie.parquet.small.file.limit' = '104857600',
>     'hoodie.parquet.max.file.size' = '268435456',
>     'hoodie.datasource.write.recordkey.field' = 'partition,offset',
>     'hoodie.datasource.write.hive_style_partitioning' = 'true',
>     'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
>     'write.bulk_insert.sort_input' = 'false',
>     'index.bootstrap.enabled' = 'false',
>     'index.state.ttl' = '60',
>     'index.type' = 'FLINK_STATE',
>     'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
>     'write.tasks' = '8',
>     'hive_sync.enable' = 'false'
> );insert into test_hudi
> select  `val`
>         ,`event_time`
>         ,`partition`
>         ,`offset`
>         ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
>         ,DATE_FORMAT(event_time, 'HH')
>  from input_table; {code}
>  
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>  
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>  
>  
> h1. Issues: TM failing + starting a TM in a new container causing deadlock 
>  # When a TM fails + starting and restoring a TM in a new container creates a deadlock situation
>  ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
>  ** JM is waiting for TM to send a success WriteMetadataEvent
>  # The deadlock above will cause either of the errors below:
>  ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize
>  ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
>  # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
>  # JM will try to restore itself from the last successful checkpoint
>  # This will cause HUDI-4907
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will attempt to restore the state of the TM. At this stage, *`this.currentInstant`* will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded and a _INFLIGHT_ instant is returned.
>  
> When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`* will always be true as the local *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in an infinite loop as {*}`lastPendingInstant()`{*}, which governs both *`instant`* and *`this.currentInstant`* will always return the same value as the state of the ckp metadata path is never changed. 
>  
> This is so as JM is waiting for the TM to finish writing for the batch for the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new _INFLIGHT_ instant, hence the deadlock. 
>  
> The short term fix is to enforce global failover every time there is a failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)