You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jürgen Thomann <ju...@innogames.com> on 2017/04/27 08:27:56 UTC

Behavior of the cancel command

Hi,

I had some time ago problems with writing data to Hadoop with the 
BucketingSink and losing data in case of cancel with savepoint because 
flush/sync command was interrupted. I tried changing Hadoop settings as 
suggested but had no luck at the end and looked into the Flink code. If 
I understand the code correctly it behaves the following way:

1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal error 
if timeout is reached

In my case the BucketingSink does not has enough time to flush 
everything before the initial interrupt is sent and some files are not 
closed properly which causes the missing data in Hadoop in my understanding.

Is my understanding correct and if yes, do you know a way to get around 
this behavior to let the close function finish the sync for all files?

Best,
J�rgen

Re: Behavior of the cancel command

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for looking into this, Jürgen!

I opened a Jira issue: https://issues.apache.org/jira/browse/FLINK-6427 <https://issues.apache.org/jira/browse/FLINK-6427>

> On 29. Apr 2017, at 09:15, Jürgen Thomann <ju...@innogames.com> wrote:
> 
> Hi Aljoscha,
> 
> In my case the valid-length file created contains a value which e.g. says 100 MB are valid to read for exactly once but the file with the data is only 95 MB large. As I understand it the valid-length file contains the length of the file at the checkpoint. This does also not happen for all files (3 HDFS sinks each with a parallelism of 2). For some parts the file size and the value in the valid-length file match exactly. 
> 
> After looking now over the checkpoint code in BucketingSink I looked into the hsync behavior again and found the following page: http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file <http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file>
> After this I downloaded the file with the hdfs dfs tool and actually the file is now even larger than the valid-length file. I checked this against the things I did before (Impala and hive select count query, and Hue download of files and wc -l) and this 3 ways result in the same amount of lines but hdfs dfs -cat <filename> | wc -l gives a much larger value. 
> 
> So my conclusion would be that the data is written and not exactly lost as I thought, but for my use case not visible because the files are not properly closed during cancel and the namenode is not aware of the flushed data. So I could imagine 2 ways out of this: 1. implement the hsync as stated at the Stack Overflow page or 2. ensure that files are properly closed during cancel.
> 
> Best,
> Jürgen
> 
> On 28.04.2017 17:38, Aljoscha Krettek wrote:
>> Hi Jürgen,
>> Is there missing data with respect to what should have been written at the time of the cancel or when the last checkpoint (or in that case, the savepoint) was performed. I’m asking because the cancel command is only sent out once the savepoint has been completed, as can be seen at [1]. If the savepoint is complete this also means that the snapshot method of the BucketingSink must have done it’s work, i.e. that it also flushed all files, which is done in [2]. There’s always the possibility of a bug, however, so we’ll have to look into this together.
>> 
>> Best,
>> Aljoscha
>> 
>> [1] https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607 <https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607>
>> 
>> [2] https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java <https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java>
>> 
>>> On 27. Apr 2017, at 10:27, Jürgen Thomann <juergen.thomann@innogames.com <ma...@innogames.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I had some time ago problems with writing data to Hadoop with the BucketingSink and losing data in case of cancel with savepoint because flush/sync command was interrupted. I tried changing Hadoop settings as suggested but had no luck at the end and looked into the Flink code. If I understand the code correctly it behaves the following way:
>>> 
>>> 1. Start a Watchdog thread if we have a cancellation timeout set
>>> 2. invoke cancel on the sink/task, but do not wait for it to finish
>>> 3. destroy buffer pool and a release resources
>>> 4. send initial interrupt to the sink/task
>>> 5. call join on the sink/task and ignore InterruptedException
>>> 6. let the watchdog send more interrupts if needed and throw fatal error if timeout is reached
>>> 
>>> In my case the BucketingSink does not has enough time to flush everything before the initial interrupt is sent and some files are not closed properly which causes the missing data in Hadoop in my understanding.
>>> 
>>> Is my understanding correct and if yes, do you know a way to get around this behavior to let the close function finish the sync for all files?
>>> 
>>> Best,
>>> Jürgen
>> 


Re: Behavior of the cancel command

Posted by Jürgen Thomann <ju...@innogames.com>.
Hi Aljoscha,

In my case the valid-length file created contains a value which e.g. 
says 100 MB are valid to read for exactly once but the file with the 
data is only 95 MB large. As I understand it the valid-length file 
contains the length of the file at the checkpoint. This does also not 
happen for all files (3 HDFS sinks each with a parallelism of 2). For 
some parts the file size and the value in the valid-length file match 
exactly.

After looking now over the checkpoint code in BucketingSink I looked 
into the hsync behavior again and found the following page: 
http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
After this I downloaded the file with the hdfs dfs tool and actually the 
file is now even larger than the valid-length file. I checked this 
against the things I did before (Impala and hive select count query, and 
Hue download of files and wc -l) and this 3 ways result in the same 
amount of lines but hdfs dfs -cat <filename> | wc -l gives a much larger 
value.

So my conclusion would be that the data is written and not exactly lost 
as I thought, but for my use case not visible because the files are not 
properly closed during cancel and the namenode is not aware of the 
flushed data. So I could imagine 2 ways out of this: 1. implement the 
hsync as stated at the Stack Overflow page or 2. ensure that files are 
properly closed during cancel.

Best,
J�rgen

On 28.04.2017 17:38, Aljoscha Krettek wrote:
> Hi J�rgen,
> Is there missing data with respect to what should have been written at 
> the time of the cancel or when the last checkpoint (or in that case, 
> the savepoint) was performed. I\u2019m asking because the cancel command is 
> only sent out once the savepoint has been completed, as can be seen at 
> [1]. If the savepoint is complete this also means that the snapshot 
> method of the BucketingSink must have done it\u2019s work, i.e. that it 
> also flushed all files, which is done in [2]. There\u2019s always the 
> possibility of a bug, however, so we\u2019ll have to look into this together.
>
> Best,
> Aljoscha
>
> [1] 
> https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607
>
> [2] 
> https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
>
>> On 27. Apr 2017, at 10:27, J�rgen Thomann 
>> <juergen.thomann@innogames.com 
>> <ma...@innogames.com>> wrote:
>>
>> Hi,
>>
>> I had some time ago problems with writing data to Hadoop with the 
>> BucketingSink and losing data in case of cancel with savepoint 
>> because flush/sync command was interrupted. I tried changing Hadoop 
>> settings as suggested but had no luck at the end and looked into the 
>> Flink code. If I understand the code correctly it behaves the 
>> following way:
>>
>> 1. Start a Watchdog thread if we have a cancellation timeout set
>> 2. invoke cancel on the sink/task, but do not wait for it to finish
>> 3. destroy buffer pool and a release resources
>> 4. send initial interrupt to the sink/task
>> 5. call join on the sink/task and ignore InterruptedException
>> 6. let the watchdog send more interrupts if needed and throw fatal 
>> error if timeout is reached
>>
>> In my case the BucketingSink does not has enough time to flush 
>> everything before the initial interrupt is sent and some files are 
>> not closed properly which causes the missing data in Hadoop in my 
>> understanding.
>>
>> Is my understanding correct and if yes, do you know a way to get 
>> around this behavior to let the close function finish the sync for 
>> all files?
>>
>> Best,
>> J�rgen
>

Re: Behavior of the cancel command

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Jürgen,
Is there missing data with respect to what should have been written at the time of the cancel or when the last checkpoint (or in that case, the savepoint) was performed. I’m asking because the cancel command is only sent out once the savepoint has been completed, as can be seen at [1]. If the savepoint is complete this also means that the snapshot method of the BucketingSink must have done it’s work, i.e. that it also flushed all files, which is done in [2]. There’s always the possibility of a bug, however, so we’ll have to look into this together.

Best,
Aljoscha

[1] https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607 <https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607>

[2] https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java

> On 27. Apr 2017, at 10:27, Jürgen Thomann <ju...@innogames.com> wrote:
> 
> Hi,
> 
> I had some time ago problems with writing data to Hadoop with the BucketingSink and losing data in case of cancel with savepoint because flush/sync command was interrupted. I tried changing Hadoop settings as suggested but had no luck at the end and looked into the Flink code. If I understand the code correctly it behaves the following way:
> 
> 1. Start a Watchdog thread if we have a cancellation timeout set
> 2. invoke cancel on the sink/task, but do not wait for it to finish
> 3. destroy buffer pool and a release resources
> 4. send initial interrupt to the sink/task
> 5. call join on the sink/task and ignore InterruptedException
> 6. let the watchdog send more interrupts if needed and throw fatal error if timeout is reached
> 
> In my case the BucketingSink does not has enough time to flush everything before the initial interrupt is sent and some files are not closed properly which causes the missing data in Hadoop in my understanding.
> 
> Is my understanding correct and if yes, do you know a way to get around this behavior to let the close function finish the sync for all files?
> 
> Best,
> Jürgen