You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by Александр Трушев <tr...@gmail.com> on 2022/06/14 09:42:46 UTC

HoodieTable removes data file right before the end of Flink job

Hello everyone, I found a strange behavior of Flink job and would like to
share it with you

*How to reproduce:*
1) Assume Flink SQL

create table t1 (uuid string, name string) with (
  'connector' = 'hudi',
  'write.batch.size' = '0.0000000001' -- trigger flush after each tuple
);
insert into t1
  select cast(uuid as string), cast(name as string)
  from (values ('id1', 'Julian'));


2) Than try to read

select * from t1;

result: empty

What do you think is it an expected result?
Even though checkpoints are disabled I expected to see the written tuple
(id1, Julian).

*Details:*

When insertion is running there is the data file with tuple (id1, Julian)
However, when it is completed the data file is removed and log says:
org.apache.hudi.table.HoodieTable  - Removing duplicate data files created
due to spark retries before committing...

The root of problem is placed in
org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
simplified listing:

void handleEventFromOperator(event) {
  if (event.isEndInput()) {
    handleEndInputEvent(event);
  } else {
    executor.execute(() -> handleWriteMetaEvent(event));
  }
}

there is a valid sequence of operation:
1) OperatorThread: executor.execute(() ->
handleWriteMetaEvent(WriteEvent(id1, Julian)))
2) OperatorThread: handleEndInputEvent(EndInputEvent)  // WriteEvent(id1,
Julian) is not performed by ExecutorThread yet
3) OperatorThread: HoodieTable.finalizeWrite() // removes data file
4) ExecutorThread: handleWriteMetaEvent(WriteEvent(id1, Julian))

Here is example[1] where HoodieTable.finalizeWrite() happened before
handleWriteMetaEvent(WriteEvent(id1, Julian))

I believe it is a bug due to changes[2] in
org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
which added performing handleEndInputEvent(event) by OperatorThread instead
of ExecutorThread

*Possible solution:*

If I'm right that this behavior is unexpected I'd like to open tiket and PR
with fix:
add something like this

void handleEventFromOperator(event) {
  if (event.isEndInput()) {
    executor.waitAllTasksCompleted() // <-------------------
    handleEndInputEvent(event);
  } else {
    executor.execute(() -> handleWriteMetaEvent(event));
  }
}

[1]
https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=9252&view=logs&j=3b6e910d-b98f-5de6-b9cb-1e5ff571f5de&t=30b5aae4-0ea0-5566-42d0-febf71a7061a&l=18884
[2]
https://github.com/apache/hudi/pull/4561/files#diff-b04b4d3e697f040d71e4fbe2b2492e188f74b8b6804b1aea3f6de897cbebec17R256

Aleksandr Trushev

Re: HoodieTable removes data file right before the end of Flink job

Posted by Danny Chan <da...@apache.org>.
Thanks for the awesome analysis, you are right, after patch [2] the
endinput event and metadata event may lost the execution sequence,
which caused the problem here.
Feel free to fire a JIRA ticket to fix it :)

Best,
Danny

Александр Трушев <tr...@gmail.com> 于2022年6月14日周二 17:43写道:
>
> Hello everyone, I found a strange behavior of Flink job and would like to
> share it with you
>
> *How to reproduce:*
> 1) Assume Flink SQL
>
> create table t1 (uuid string, name string) with (
>   'connector' = 'hudi',
>   'write.batch.size' = '0.0000000001' -- trigger flush after each tuple
> );
> insert into t1
>   select cast(uuid as string), cast(name as string)
>   from (values ('id1', 'Julian'));
>
>
> 2) Than try to read
>
> select * from t1;
>
> result: empty
>
> What do you think is it an expected result?
> Even though checkpoints are disabled I expected to see the written tuple
> (id1, Julian).
>
> *Details:*
>
> When insertion is running there is the data file with tuple (id1, Julian)
> However, when it is completed the data file is removed and log says:
> org.apache.hudi.table.HoodieTable  - Removing duplicate data files created
> due to spark retries before committing...
>
> The root of problem is placed in
> org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
> simplified listing:
>
> void handleEventFromOperator(event) {
>   if (event.isEndInput()) {
>     handleEndInputEvent(event);
>   } else {
>     executor.execute(() -> handleWriteMetaEvent(event));
>   }
> }
>
> there is a valid sequence of operation:
> 1) OperatorThread: executor.execute(() ->
> handleWriteMetaEvent(WriteEvent(id1, Julian)))
> 2) OperatorThread: handleEndInputEvent(EndInputEvent)  // WriteEvent(id1,
> Julian) is not performed by ExecutorThread yet
> 3) OperatorThread: HoodieTable.finalizeWrite() // removes data file
> 4) ExecutorThread: handleWriteMetaEvent(WriteEvent(id1, Julian))
>
> Here is example[1] where HoodieTable.finalizeWrite() happened before
> handleWriteMetaEvent(WriteEvent(id1, Julian))
>
> I believe it is a bug due to changes[2] in
> org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
> which added performing handleEndInputEvent(event) by OperatorThread instead
> of ExecutorThread
>
> *Possible solution:*
>
> If I'm right that this behavior is unexpected I'd like to open tiket and PR
> with fix:
> add something like this
>
> void handleEventFromOperator(event) {
>   if (event.isEndInput()) {
>     executor.waitAllTasksCompleted() // <-------------------
>     handleEndInputEvent(event);
>   } else {
>     executor.execute(() -> handleWriteMetaEvent(event));
>   }
> }
>
> [1]
> https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=9252&view=logs&j=3b6e910d-b98f-5de6-b9cb-1e5ff571f5de&t=30b5aae4-0ea0-5566-42d0-febf71a7061a&l=18884
> [2]
> https://github.com/apache/hudi/pull/4561/files#diff-b04b4d3e697f040d71e4fbe2b2492e188f74b8b6804b1aea3f6de897cbebec17R256
>
> Aleksandr Trushev