You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Bode <ma...@tngtech.com> on 2016/03/03 14:17:26 UTC

Jobmanager HA with Rolling Sink in HDFS

Hi everyone,

unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).

When using

RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);

and then killing the job manager, the new job manager is unable to restore the old state throwing
---
java.lang.Exception: Could not restore checkpointed state to operators and functions
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
	... 3 more
Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
	... 4 more
---
I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?

Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?

Cheers,
 Max

[1] https://issues.apache.org/jira/browse/FLINK-2979

—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,

did you by any chance get around to looking at the problem again? It seems to us that at the time when restoreState() is called, the files are already in final status and there are additional .valid-length files (hence there are neither pending nor in-progress files). Furthermore, one of the part files (11 in the example below) is missing completely. We are able to reproduce this behavior by killing a task manager. Can you make sense of that?

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 03.03.2016 um 15:04 schrieb Maximilian Bode <ma...@tngtech.com>:
> 
> Hi Aljoscha,
> 
> thank you for the fast answer. The files in HDFS change as follows:
> 
> -before task manager is killed:
> [user@host user]$ hdfs dfs -ls -R /hdfs/dir/outbound
> -rw-r--r--   2 user hadoop    2435461 2016-03-03 14:51 /hdfs/dir/outbound/_part-0-0.in-progress
> -rw-r--r--   2 user hadoop    2404604 2016-03-03 14:51 /hdfs/dir/outbound/_part-1-0.in-progress
> -rw-r--r--   2 user hadoop    2453796 2016-03-03 14:51 /hdfs/dir/outbound/_part-10-0.in-progress
> -rw-r--r--   2 user hadoop    2447794 2016-03-03 14:51 /hdfs/dir/outbound/_part-11-0.in-progress
> -rw-r--r--   2 user hadoop    2453479 2016-03-03 14:51 /hdfs/dir/outbound/_part-2-0.in-progress
> -rw-r--r--   2 user hadoop    2512413 2016-03-03 14:51 /hdfs/dir/outbound/_part-3-0.in-progress
> -rw-r--r--   2 user hadoop    2431290 2016-03-03 14:51 /hdfs/dir/outbound/_part-4-0.in-progress
> -rw-r--r--   2 user hadoop    2457820 2016-03-03 14:51 /hdfs/dir/outbound/_part-5-0.in-progress
> -rw-r--r--   2 user hadoop    2430218 2016-03-03 14:51 /hdfs/dir/outbound/_part-6-0.in-progress
> -rw-r--r--   2 user hadoop    2482970 2016-03-03 14:51 /hdfs/dir/outbound/_part-7-0.in-progress
> -rw-r--r--   2 user hadoop    2452622 2016-03-03 14:51 /hdfs/dir/outbound/_part-8-0.in-progress
> -rw-r--r--   2 user hadoop    2467733 2016-03-03 14:51 /hdfs/dir/outbound/_part-9-0.in-progress
> 
> -shortly after task manager is killed:
> [user@host user]$ hdfs dfs -ls -R /hdfs/dir/outbound
> -rw-r--r--   2 user hadoop   12360198 2016-03-03 14:52 /hdfs/dir/outbound/_part-0-0.pending
> -rw-r--r--   2 user hadoop    9350654 2016-03-03 14:52 /hdfs/dir/outbound/_part-1-0.in-progress
> -rw-r--r--   2 user hadoop    9389872 2016-03-03 14:52 /hdfs/dir/outbound/_part-10-0.in-progress
> -rw-r--r--   2 user hadoop   12236015 2016-03-03 14:52 /hdfs/dir/outbound/_part-11-0.pending
> -rw-r--r--   2 user hadoop   12256483 2016-03-03 14:52 /hdfs/dir/outbound/_part-2-0.pending
> -rw-r--r--   2 user hadoop   12261730 2016-03-03 14:52 /hdfs/dir/outbound/_part-3-0.pending
> -rw-r--r--   2 user hadoop    9418913 2016-03-03 14:52 /hdfs/dir/outbound/_part-4-0.in-progress
> -rw-r--r--   2 user hadoop   12176987 2016-03-03 14:52 /hdfs/dir/outbound/_part-5-0.pending
> -rw-r--r--   2 user hadoop   12165782 2016-03-03 14:52 /hdfs/dir/outbound/_part-6-0.pending
> -rw-r--r--   2 user hadoop    9474037 2016-03-03 14:52 /hdfs/dir/outbound/_part-7-0.in-progress
> -rw-r--r--   2 user hadoop   12136347 2016-03-03 14:52 /hdfs/dir/outbound/_part-8-0.pending
> -rw-r--r--   2 user hadoop   12305943 2016-03-03 14:52 /hdfs/dir/outbound/_part-9-0.pending
> 
> -still a bit later:
> [user@host user]$ hdfs dfs -ls -R /hdfs/dir/outbound
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-0-0.valid-length
> -rw-r--r--   2 user hadoop    8026667 2016-03-03 14:52 /hdfs/dir/outbound/_part-0-1.pending
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-1-0.valid-length
> -rw-r--r--   2 user hadoop    8097752 2016-03-03 14:52 /hdfs/dir/outbound/_part-1-1.pending
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-10-0.valid-length
> -rw-r--r--   2 user hadoop    7109259 2016-03-03 14:52 /hdfs/dir/outbound/_part-10-1.pending
> -rw-r--r--   2 user hadoop          0 2016-03-03 14:52 /hdfs/dir/outbound/_part-2-0.valid-length
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-3-0.valid-length
> -rw-r--r--   2 user hadoop    7974655 2016-03-03 14:52 /hdfs/dir/outbound/_part-3-1.pending
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-4-0.valid-length
> -rw-r--r--   2 user hadoop    5753163 2016-03-03 14:52 /hdfs/dir/outbound/_part-4-1.pending
> -rw-r--r--   2 user hadoop          0 2016-03-03 14:52 /hdfs/dir/outbound/_part-5-0.valid-length
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-6-0.valid-length
> -rw-r--r--   2 user hadoop    7904468 2016-03-03 14:52 /hdfs/dir/outbound/_part-6-1.pending
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-7-0.valid-length
> -rw-r--r--   2 user hadoop    7477004 2016-03-03 14:52 /hdfs/dir/outbound/_part-7-1.pending
> -rw-r--r--   2 user hadoop          0 2016-03-03 14:52 /hdfs/dir/outbound/_part-8-0.valid-length
> -rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-9-0.valid-length
> -rw-r--r--   2 user hadoop    7979307 2016-03-03 14:52 /hdfs/dir/outbound/_part-9-1.pending
> -rw-r--r--   2 user hadoop   12360198 2016-03-03 14:52 /hdfs/dir/outbound/part-0-0
> -rw-r--r--   2 user hadoop    9350654 2016-03-03 14:52 /hdfs/dir/outbound/part-1-0
> -rw-r--r--   2 user hadoop    9389872 2016-03-03 14:52 /hdfs/dir/outbound/part-10-0
> -rw-r--r--   2 user hadoop   12256483 2016-03-03 14:52 /hdfs/dir/outbound/part-2-0
> -rw-r--r--   2 user hadoop   12261730 2016-03-03 14:52 /hdfs/dir/outbound/part-3-0
> -rw-r--r--   2 user hadoop    9418913 2016-03-03 14:52 /hdfs/dir/outbound/part-4-0
> -rw-r--r--   2 user hadoop   12176987 2016-03-03 14:52 /hdfs/dir/outbound/part-5-0
> -rw-r--r--   2 user hadoop   12165782 2016-03-03 14:52 /hdfs/dir/outbound/part-6-0
> -rw-r--r--   2 user hadoop    9474037 2016-03-03 14:52 /hdfs/dir/outbound/part-7-0
> -rw-r--r--   2 user hadoop   12136347 2016-03-03 14:52 /hdfs/dir/outbound/part-8-0
> -rw-r--r--   2 user hadoop   12305943 2016-03-03 14:52 /hdfs/dir/outbound/part-9-0
> 
> Can you see from this what is going wrong?
> 
> Cheers,
>  Max
> —
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com <ma...@tngtech.com>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 03.03.2016 um 14:50 schrieb Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>>:
>> 
>> Hi,
>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.bode@tngtech.com <ma...@tngtech.com>> wrote:
>>> 
>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>> 
>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <maximilian.bode@tngtech.com <ma...@tngtech.com>>:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>> 
>>>> When using
>>>> 
>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound <hdfs://our.machine.com:8020/hdfs/dir/outbound>");
>>>> sink.setBucketer(new NonRollingBucketer());
>>>> output.addSink(sink);
>>>> 
>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>> ---
>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 <hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0> was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>> 	... 3 more
>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 <hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0> was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>> 	... 4 more
>>>> ---
>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>> 
>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 <https://issues.apache.org/jira/browse/FLINK-2979>
>>>> 
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com <ma...@tngtech.com>
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,

thank you for the fast answer. The files in HDFS change as follows:

-before task manager is killed:
[user@host user]$ hdfs dfs -ls -R /hdfs/dir/outbound
-rw-r--r--   2 user hadoop    2435461 2016-03-03 14:51 /hdfs/dir/outbound/_part-0-0.in-progress
-rw-r--r--   2 user hadoop    2404604 2016-03-03 14:51 /hdfs/dir/outbound/_part-1-0.in-progress
-rw-r--r--   2 user hadoop    2453796 2016-03-03 14:51 /hdfs/dir/outbound/_part-10-0.in-progress
-rw-r--r--   2 user hadoop    2447794 2016-03-03 14:51 /hdfs/dir/outbound/_part-11-0.in-progress
-rw-r--r--   2 user hadoop    2453479 2016-03-03 14:51 /hdfs/dir/outbound/_part-2-0.in-progress
-rw-r--r--   2 user hadoop    2512413 2016-03-03 14:51 /hdfs/dir/outbound/_part-3-0.in-progress
-rw-r--r--   2 user hadoop    2431290 2016-03-03 14:51 /hdfs/dir/outbound/_part-4-0.in-progress
-rw-r--r--   2 user hadoop    2457820 2016-03-03 14:51 /hdfs/dir/outbound/_part-5-0.in-progress
-rw-r--r--   2 user hadoop    2430218 2016-03-03 14:51 /hdfs/dir/outbound/_part-6-0.in-progress
-rw-r--r--   2 user hadoop    2482970 2016-03-03 14:51 /hdfs/dir/outbound/_part-7-0.in-progress
-rw-r--r--   2 user hadoop    2452622 2016-03-03 14:51 /hdfs/dir/outbound/_part-8-0.in-progress
-rw-r--r--   2 user hadoop    2467733 2016-03-03 14:51 /hdfs/dir/outbound/_part-9-0.in-progress

-shortly after task manager is killed:
[user@host user]$ hdfs dfs -ls -R /hdfs/dir/outbound
-rw-r--r--   2 user hadoop   12360198 2016-03-03 14:52 /hdfs/dir/outbound/_part-0-0.pending
-rw-r--r--   2 user hadoop    9350654 2016-03-03 14:52 /hdfs/dir/outbound/_part-1-0.in-progress
-rw-r--r--   2 user hadoop    9389872 2016-03-03 14:52 /hdfs/dir/outbound/_part-10-0.in-progress
-rw-r--r--   2 user hadoop   12236015 2016-03-03 14:52 /hdfs/dir/outbound/_part-11-0.pending
-rw-r--r--   2 user hadoop   12256483 2016-03-03 14:52 /hdfs/dir/outbound/_part-2-0.pending
-rw-r--r--   2 user hadoop   12261730 2016-03-03 14:52 /hdfs/dir/outbound/_part-3-0.pending
-rw-r--r--   2 user hadoop    9418913 2016-03-03 14:52 /hdfs/dir/outbound/_part-4-0.in-progress
-rw-r--r--   2 user hadoop   12176987 2016-03-03 14:52 /hdfs/dir/outbound/_part-5-0.pending
-rw-r--r--   2 user hadoop   12165782 2016-03-03 14:52 /hdfs/dir/outbound/_part-6-0.pending
-rw-r--r--   2 user hadoop    9474037 2016-03-03 14:52 /hdfs/dir/outbound/_part-7-0.in-progress
-rw-r--r--   2 user hadoop   12136347 2016-03-03 14:52 /hdfs/dir/outbound/_part-8-0.pending
-rw-r--r--   2 user hadoop   12305943 2016-03-03 14:52 /hdfs/dir/outbound/_part-9-0.pending

-still a bit later:
[user@host user]$ hdfs dfs -ls -R /hdfs/dir/outbound
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-0-0.valid-length
-rw-r--r--   2 user hadoop    8026667 2016-03-03 14:52 /hdfs/dir/outbound/_part-0-1.pending
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-1-0.valid-length
-rw-r--r--   2 user hadoop    8097752 2016-03-03 14:52 /hdfs/dir/outbound/_part-1-1.pending
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-10-0.valid-length
-rw-r--r--   2 user hadoop    7109259 2016-03-03 14:52 /hdfs/dir/outbound/_part-10-1.pending
-rw-r--r--   2 user hadoop          0 2016-03-03 14:52 /hdfs/dir/outbound/_part-2-0.valid-length
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-3-0.valid-length
-rw-r--r--   2 user hadoop    7974655 2016-03-03 14:52 /hdfs/dir/outbound/_part-3-1.pending
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-4-0.valid-length
-rw-r--r--   2 user hadoop    5753163 2016-03-03 14:52 /hdfs/dir/outbound/_part-4-1.pending
-rw-r--r--   2 user hadoop          0 2016-03-03 14:52 /hdfs/dir/outbound/_part-5-0.valid-length
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-6-0.valid-length
-rw-r--r--   2 user hadoop    7904468 2016-03-03 14:52 /hdfs/dir/outbound/_part-6-1.pending
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-7-0.valid-length
-rw-r--r--   2 user hadoop    7477004 2016-03-03 14:52 /hdfs/dir/outbound/_part-7-1.pending
-rw-r--r--   2 user hadoop          0 2016-03-03 14:52 /hdfs/dir/outbound/_part-8-0.valid-length
-rw-r--r--   2 user hadoop          9 2016-03-03 14:52 /hdfs/dir/outbound/_part-9-0.valid-length
-rw-r--r--   2 user hadoop    7979307 2016-03-03 14:52 /hdfs/dir/outbound/_part-9-1.pending
-rw-r--r--   2 user hadoop   12360198 2016-03-03 14:52 /hdfs/dir/outbound/part-0-0
-rw-r--r--   2 user hadoop    9350654 2016-03-03 14:52 /hdfs/dir/outbound/part-1-0
-rw-r--r--   2 user hadoop    9389872 2016-03-03 14:52 /hdfs/dir/outbound/part-10-0
-rw-r--r--   2 user hadoop   12256483 2016-03-03 14:52 /hdfs/dir/outbound/part-2-0
-rw-r--r--   2 user hadoop   12261730 2016-03-03 14:52 /hdfs/dir/outbound/part-3-0
-rw-r--r--   2 user hadoop    9418913 2016-03-03 14:52 /hdfs/dir/outbound/part-4-0
-rw-r--r--   2 user hadoop   12176987 2016-03-03 14:52 /hdfs/dir/outbound/part-5-0
-rw-r--r--   2 user hadoop   12165782 2016-03-03 14:52 /hdfs/dir/outbound/part-6-0
-rw-r--r--   2 user hadoop    9474037 2016-03-03 14:52 /hdfs/dir/outbound/part-7-0
-rw-r--r--   2 user hadoop   12136347 2016-03-03 14:52 /hdfs/dir/outbound/part-8-0
-rw-r--r--   2 user hadoop   12305943 2016-03-03 14:52 /hdfs/dir/outbound/part-9-0

Can you see from this what is going wrong?

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 03.03.2016 um 14:50 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi,
> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>> 
>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>> 
>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>> 
>>> Hi everyone,
>>> 
>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>> 
>>> When using
>>> 
>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>> sink.setBucketer(new NonRollingBucketer());
>>> output.addSink(sink);
>>> 
>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>> ---
>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> 	at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>> 	... 3 more
>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>> 	... 4 more
>>> ---
>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>> 
>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>> 
>>> Cheers,
>>> Max
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>> 
>>> —
>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,

I think we are getting closer. I can verify that using your code the same thing happens again – one of the files is missing, even though there is a .valid-length file for it. I have attached a dump from the hdfs folder, which does contain a _part-11-0.valid-length file but NO part-11-0 file.

What is more, the byte length in _part-11-0.valid-length is 3749. This corresponds to 768 lines (see part-10). When adding up everything* I get exactly 99232 lines for 100k sent records, so I think we have the culprit :)

Cheers,
 Max

* I first copyToLocal all relevant files, then
	for i in part-* ; do head -c$(cat "_$i.valid-length" | strings) "$i" > "final/$i.final" ; done
	cp *.pending final/
	wc -l final/*
   769 part-0-0.final
  7565 _part-0-1.pending
   768 part-10-0.final
  7564 _part-10-1.pending
   769 part-1-0.final
  7564 _part-11-0.pending
  7565 _part-1-1.pending
   769 part-2-0.final
  7565 _part-2-1.pending
   769 part-3-0.final
  7565 _part-3-1.pending
   769 part-4-0.final
  7565 _part-4-1.pending
   769 part-5-0.final
  7565 _part-5-1.pending
   769 part-6-0.final
  7564 _part-6-1.pending
   769 part-7-0.final
  7564 _part-7-1.pending
   769 part-8-0.final
  7564 _part-8-1.pending
   769 part-9-0.final
  7564 _part-9-1.pending
 99232 total


—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 09.03.2016 um 13:31 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi Maximilian,
> I’m currently running some tests again on a cluster to try and pinpoint the problem. Just to make sure, you are using Hadoop 2.4.1 with Yarn and Kafka 0.8, correct?
> 
> In the meantime, could you maybe run a test where you completely bypass Kafka, just so we can see whether the problem is in Kafka or the RollingSink. For my tests I created this source:
> 
> public static class LongSource extends RichSourceFunction<Long> implements Checkpointed<Long> {
>   private static final long serialVersionUID = 1L;
> 
>   private long numElements;
> 
>   private int sleepInterval;
> 
>   private volatile boolean running = true;
> 
>   private long index = 0;
> 
>   public LongSource(long numElements, int sleepInterval) {
>      this.numElements = numElements;
>      this.sleepInterval = sleepInterval;
>   }
> 
>   @Override
>   public void run(SourceContext<Long> out) throws Exception {
> 
>      while (running && index < numElements) {
>         out.collect(index);
>         Thread.sleep(sleepInterval);
>         index++;
>      }
> 
>      while (running) {
>         Thread.sleep(100);
>      }
>   }
> 
>   @Override
>   public Long snapshotState(long l, long l1) throws Exception {
>      return index;
>   }
> 
>   @Override
>   public void restoreState(Long aLong) throws Exception {
>      this.index = aLong;
>   }
> 
>   @Override
>   public void cancel() {
>      running = false;
>   }
> }
> 
> It’s a fault-tolerant source that emits elements and I can specify a sleep interval so that the job is not too fast and I can kill it before it finishes.
> 
> My testing job is this, which should be quite similar to yours:
> 
> DataStream<Long> inputStream = env.addSource(new DataGenerator.LongSource(2000000, 1));
> 
> DataStream<String> result = inputStream.map(new RichMapFunction<Long, String>() {
>   LongCounter count;
>   @Override
>   public void open(Configuration parameters) throws Exception {
>      count = getRuntimeContext().getLongCounter("count");
>   }
> 
>   @Override
>   public String map(Long aLong) throws Exception {
>      count.add(1L);
>      return "" + aLong;
>   }
> });
> 
> RollingSink<String> sink = new RollingSink<>(sinkPath);
> sink.setBucketer(new NonRollingBucketer());
> result.addSink(sink);
> 
> Cheers,
> Aljoscha
>> On 09 Mar 2016, at 08:31, Maximilian Bode <ma...@tngtech.com> wrote:
>> 
>> Hi Aljoscha,
>> 
>> yeah I should have been clearer. I did mean those accumulators but am not trusting them in the sense of total number (as you said, they are reset on failure). On the other hand, if they do not change for a while it is pretty obvious that the job has ingested everything in the queue. But you are right, this is kind of heuristic. In combination with the fact that the DateTimeBucketer does not create new folders I believe this should be sufficient to decide when the job has basically finished, though.
>> 
>> So the setup is the following: The Flink job consists of a FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and finally a rolling sink writing to HDFS. I start it in a per-job yarn session with n=3, s=4. Then I pour 2 million records in the Kafka queue the application is reading from. If no job/task managers are killed, the behavior is exactly as expected: the output files in HDFS grow with time and I can exactly monitor via the accumulator when every record has been ingested from Kafka. After that time, I give the job a few seconds and then cancel it via the web interface. Then still some time later (to give the job the chance to output the few records still hanging around) a wc -l on the output files yields exactly the expected 2 million.
>> 
>> On the other hand, if I kill a task manager while the job is in progress, one of the 12 output files seems to be missing as described before. A wc -l on only the relevant bytes as I mentioned in an earlier mail then leads to a number smaller than 2 million.
>> 
>> We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.
>> 
>> Cheers,
>> Max
>> —
>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek <al...@apache.org>:
>>> 
>>> Hi,
>>> with accumulator you mean the ones you get from RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not fault-tolerant which means that the count in these probably doesn’t reflect the actual number of elements that were processed. When a job fails and restarts the accumulators should start from scratch. This makes me wonder how yours ever reach the required 2 mio, for it to be considered “done”.
>>> 
>>> This keeps getting more mysterious…
>>> 
>>> By the way, what are you using as StateBackend and checkpoint interval?
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 08 Mar 2016, at 13:38, Maximilian Bode <ma...@tngtech.com> wrote:
>>>> 
>>>> Hi,
>>>> thanks for the fast answer. Answers inline.
>>>> 
>>>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>> 
>>>>> Hi,
>>>>> a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted.
>>>>> 
>>>>> Missing data, however, is a problem. Maybe I need some more information about your setup:
>>>>> 
>>>>> - When are you inspecting the part files?
>>>> Some time after the cluster is shut down
>>>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
>>>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written only after cancelling the job, right?
>>>>> - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS?
>>>> I have an accumulator in a map right before writing into HDFS. Also, the RollingSink has a DataTimeBucketer which makes it transparent when no new data is arriving anymore as the last bucket is from some minutes ago.
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 08 Mar 2016, at 13:19, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>> 
>>>>>> Hi Aljoscha,
>>>>>> 
>>>>>> oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
>>>>>> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
>>>>>> which seems to do the trick.
>>>>>> 
>>>>>> Unfortunately, now some records are missing again. In particular, there are the files
>>>>>> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
>>>>>> 	part-0-1, part-1-1, ..., part-10-0
>>>>>> in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.
>>>>>> 
>>>>>> Thanks again for your help.
>>>>>> 
>>>>>> Cheers,
>>>>>> Max
>>>>>> —
>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>> 
>>>>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
>>>>>>> 
>>>>>>> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
>>>>>>> 
>>>>>>> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>>> 
>>>>>>>> Hi Aljoscha,
>>>>>>>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>>>>>>>> 
>>>>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>>>>>> 
>>>>>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>> <travis.log>
>>>>>>>> —
>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>> 
>>>>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>>>>>> 
>>>>>>>>> Hi Maximilian,
>>>>>>>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>>>>>>>> 
>>>>>>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>>>>> 
>>>>>>>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>>>>>>>> 
>>>>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>> 
>>>>>>>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>>>>>> 
>>>>>>>>>>>> When using
>>>>>>>>>>>> 
>>>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>>>>> output.addSink(sink);
>>>>>>>>>>>> 
>>>>>>>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>>>>>>>> ---
>>>>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>>>>> 	... 3 more
>>>>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>>>>> 	... 4 more
>>>>>>>>>>>> ---
>>>>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>>>>>>>> 
>>>>>>>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Max
>>>>>>>>>>>> 
>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>>>>> 
>>>>>>>>>>>> —
>>>>>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Maximilian,
I’m currently running some tests again on a cluster to try and pinpoint the problem. Just to make sure, you are using Hadoop 2.4.1 with Yarn and Kafka 0.8, correct?

In the meantime, could you maybe run a test where you completely bypass Kafka, just so we can see whether the problem is in Kafka or the RollingSink. For my tests I created this source:

public static class LongSource extends RichSourceFunction<Long> implements Checkpointed<Long> {
   private static final long serialVersionUID = 1L;

   private long numElements;

   private int sleepInterval;

   private volatile boolean running = true;

   private long index = 0;

   public LongSource(long numElements, int sleepInterval) {
      this.numElements = numElements;
      this.sleepInterval = sleepInterval;
   }

   @Override
   public void run(SourceContext<Long> out) throws Exception {

      while (running && index < numElements) {
         out.collect(index);
         Thread.sleep(sleepInterval);
         index++;
      }

      while (running) {
         Thread.sleep(100);
      }
   }

   @Override
   public Long snapshotState(long l, long l1) throws Exception {
      return index;
   }

   @Override
   public void restoreState(Long aLong) throws Exception {
      this.index = aLong;
   }

   @Override
   public void cancel() {
      running = false;
   }
}

It’s a fault-tolerant source that emits elements and I can specify a sleep interval so that the job is not too fast and I can kill it before it finishes.

My testing job is this, which should be quite similar to yours:

DataStream<Long> inputStream = env.addSource(new DataGenerator.LongSource(2000000, 1));

DataStream<String> result = inputStream.map(new RichMapFunction<Long, String>() {
   LongCounter count;
   @Override
   public void open(Configuration parameters) throws Exception {
      count = getRuntimeContext().getLongCounter("count");
   }

   @Override
   public String map(Long aLong) throws Exception {
      count.add(1L);
      return "" + aLong;
   }
});

RollingSink<String> sink = new RollingSink<>(sinkPath);
sink.setBucketer(new NonRollingBucketer());
result.addSink(sink);

Cheers,
Aljoscha
> On 09 Mar 2016, at 08:31, Maximilian Bode <ma...@tngtech.com> wrote:
> 
> Hi Aljoscha,
> 
> yeah I should have been clearer. I did mean those accumulators but am not trusting them in the sense of total number (as you said, they are reset on failure). On the other hand, if they do not change for a while it is pretty obvious that the job has ingested everything in the queue. But you are right, this is kind of heuristic. In combination with the fact that the DateTimeBucketer does not create new folders I believe this should be sufficient to decide when the job has basically finished, though.
> 
> So the setup is the following: The Flink job consists of a FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and finally a rolling sink writing to HDFS. I start it in a per-job yarn session with n=3, s=4. Then I pour 2 million records in the Kafka queue the application is reading from. If no job/task managers are killed, the behavior is exactly as expected: the output files in HDFS grow with time and I can exactly monitor via the accumulator when every record has been ingested from Kafka. After that time, I give the job a few seconds and then cancel it via the web interface. Then still some time later (to give the job the chance to output the few records still hanging around) a wc -l on the output files yields exactly the expected 2 million.
> 
> On the other hand, if I kill a task manager while the job is in progress, one of the 12 output files seems to be missing as described before. A wc -l on only the relevant bytes as I mentioned in an earlier mail then leads to a number smaller than 2 million.
> 
> We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.
> 
> Cheers,
>  Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek <al...@apache.org>:
>> 
>> Hi,
>> with accumulator you mean the ones you get from RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not fault-tolerant which means that the count in these probably doesn’t reflect the actual number of elements that were processed. When a job fails and restarts the accumulators should start from scratch. This makes me wonder how yours ever reach the required 2 mio, for it to be considered “done”.
>> 
>> This keeps getting more mysterious… 
>> 
>> By the way, what are you using as StateBackend and checkpoint interval?
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 13:38, Maximilian Bode <ma...@tngtech.com> wrote:
>>> 
>>> Hi,
>>> thanks for the fast answer. Answers inline.
>>> 
>>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <al...@apache.org>:
>>>> 
>>>> Hi,
>>>> a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted.
>>>> 
>>>> Missing data, however, is a problem. Maybe I need some more information about your setup:
>>>> 
>>>> - When are you inspecting the part files?
>>> Some time after the cluster is shut down
>>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
>>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written only after cancelling the job, right?
>>>> - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS?
>>> I have an accumulator in a map right before writing into HDFS. Also, the RollingSink has a DataTimeBucketer which makes it transparent when no new data is arriving anymore as the last bucket is from some minutes ago.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 08 Mar 2016, at 13:19, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
>>>>> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
>>>>> which seems to do the trick.
>>>>> 
>>>>> Unfortunately, now some records are missing again. In particular, there are the files
>>>>> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
>>>>> 	part-0-1, part-1-1, ..., part-10-0
>>>>> in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.
>>>>> 
>>>>> Thanks again for your help.
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> —
>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> 
>>>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>>> 
>>>>>> Hi,
>>>>>> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
>>>>>> 
>>>>>> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
>>>>>> 
>>>>>> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>> 
>>>>>>> Hi Aljoscha,
>>>>>>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>>>>>>> 
>>>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>>>>> 
>>>>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> <travis.log>
>>>>>>> —
>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>> 
>>>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>>>>> 
>>>>>>>> Hi Maximilian,
>>>>>>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>>>>>>> 
>>>>>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>>>> 
>>>>>>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>>>>>>> 
>>>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>>>>>>> 
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>> 
>>>>>>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>>>>> 
>>>>>>>>>>> When using
>>>>>>>>>>> 
>>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>>>> output.addSink(sink);
>>>>>>>>>>> 
>>>>>>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>>>>>>> ---
>>>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>>>> 	... 3 more
>>>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>>>> 	... 4 more
>>>>>>>>>>> ---
>>>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>>>>>>> 
>>>>>>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>> 
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>>>> 
>>>>>>>>>>> —
>>>>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,

yeah I should have been clearer. I did mean those accumulators but am not trusting them in the sense of total number (as you said, they are reset on failure). On the other hand, if they do not change for a while it is pretty obvious that the job has ingested everything in the queue. But you are right, this is kind of heuristic. In combination with the fact that the DateTimeBucketer does not create new folders I believe this should be sufficient to decide when the job has basically finished, though.

So the setup is the following: The Flink job consists of a FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and finally a rolling sink writing to HDFS. I start it in a per-job yarn session with n=3, s=4. Then I pour 2 million records in the Kafka queue the application is reading from. If no job/task managers are killed, the behavior is exactly as expected: the output files in HDFS grow with time and I can exactly monitor via the accumulator when every record has been ingested from Kafka. After that time, I give the job a few seconds and then cancel it via the web interface. Then still some time later (to give the job the chance to output the few records still hanging around) a wc -l on the output files yields exactly the expected 2 million.

On the other hand, if I kill a task manager while the job is in progress, one of the 12 output files seems to be missing as described before. A wc -l on only the relevant bytes as I mentioned in an earlier mail then leads to a number smaller than 2 million.

We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi,
> with accumulator you mean the ones you get from RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not fault-tolerant which means that the count in these probably doesn’t reflect the actual number of elements that were processed. When a job fails and restarts the accumulators should start from scratch. This makes me wonder how yours ever reach the required 2 mio, for it to be considered “done”.
> 
> This keeps getting more mysterious…
> 
> By the way, what are you using as StateBackend and checkpoint interval?
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:38, Maximilian Bode <ma...@tngtech.com> wrote:
>> 
>> Hi,
>> thanks for the fast answer. Answers inline.
>> 
>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <al...@apache.org>:
>>> 
>>> Hi,
>>> a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted.
>>> 
>>> Missing data, however, is a problem. Maybe I need some more information about your setup:
>>> 
>>> - When are you inspecting the part files?
>> Some time after the cluster is shut down
>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written only after cancelling the job, right?
>>> - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS?
>> I have an accumulator in a map right before writing into HDFS. Also, the RollingSink has a DataTimeBucketer which makes it transparent when no new data is arriving anymore as the last bucket is from some minutes ago.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 08 Mar 2016, at 13:19, Maximilian Bode <ma...@tngtech.com> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> 
>>>> oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
>>>> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
>>>> which seems to do the trick.
>>>> 
>>>> Unfortunately, now some records are missing again. In particular, there are the files
>>>> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
>>>> 	part-0-1, part-1-1, ..., part-10-0
>>>> in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.
>>>> 
>>>> Thanks again for your help.
>>>> 
>>>> Cheers,
>>>> Max
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>> 
>>>>> Hi,
>>>>> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
>>>>> 
>>>>> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
>>>>> 
>>>>> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>> 
>>>>>> Hi Aljoscha,
>>>>>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>>>>>> 
>>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>>>> 
>>>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>>>>>> 
>>>>>> Cheers,
>>>>>> Max
>>>>>> <travis.log>
>>>>>> —
>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>> 
>>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>>>> 
>>>>>>> Hi Maximilian,
>>>>>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>>>>>> 
>>>>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>>> 
>>>>>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>>>> 
>>>>>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>>>>>> 
>>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>>>>>> 
>>>>>>>>>> Hi everyone,
>>>>>>>>>> 
>>>>>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>>>> 
>>>>>>>>>> When using
>>>>>>>>>> 
>>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>>> output.addSink(sink);
>>>>>>>>>> 
>>>>>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>>>>>> ---
>>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>>> 	... 3 more
>>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>>> 	... 4 more
>>>>>>>>>> ---
>>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>>>>>> 
>>>>>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Max
>>>>>>>>>> 
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>>> 
>>>>>>>>>> —
>>>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
with accumulator you mean the ones you get from RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not fault-tolerant which means that the count in these probably doesn’t reflect the actual number of elements that were processed. When a job fails and restarts the accumulators should start from scratch. This makes me wonder how yours ever reach the required 2 mio, for it to be considered “done”.

This keeps getting more mysterious… 

By the way, what are you using as StateBackend and checkpoint interval?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:38, Maximilian Bode <ma...@tngtech.com> wrote:
> 
> Hi,
> thanks for the fast answer. Answers inline.
> 
>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <al...@apache.org>:
>> 
>> Hi,
>> a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted.
>> 
>> Missing data, however, is a problem. Maybe I need some more information about your setup:
>> 
>> - When are you inspecting the part files?
> Some time after the cluster is shut down
>> - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written only after cancelling the job, right?
>> - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS?
> I have an accumulator in a map right before writing into HDFS. Also, the RollingSink has a DataTimeBucketer which makes it transparent when no new data is arriving anymore as the last bucket is from some minutes ago.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 13:19, Maximilian Bode <ma...@tngtech.com> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
>>> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
>>> which seems to do the trick.
>>> 
>>> Unfortunately, now some records are missing again. In particular, there are the files
>>> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
>>> 	part-0-1, part-1-1, ..., part-10-0
>>> in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.
>>> 
>>> Thanks again for your help.
>>> 
>>> Cheers,
>>> Max
>>> —
>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
>>>> 
>>>> Hi,
>>>> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
>>>> 
>>>> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
>>>> 
>>>> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>> 
>>>>> Hi Aljoscha,
>>>>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>>>>> 
>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>>> 
>>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> <travis.log>
>>>>> —
>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> 
>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>>> 
>>>>>> Hi Maximilian,
>>>>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>>>>> 
>>>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>> 
>>>>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>>> 
>>>>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>>>>> 
>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>>>>> 
>>>>>>>>> Hi everyone,
>>>>>>>>> 
>>>>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>>> 
>>>>>>>>> When using
>>>>>>>>> 
>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>> output.addSink(sink);
>>>>>>>>> 
>>>>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>>>>> ---
>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>> 	... 3 more
>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>> 	... 4 more
>>>>>>>>> ---
>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>>>>> 
>>>>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>> 
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>> 
>>>>>>>>> —
>>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi,
thanks for the fast answer. Answers inline.

> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi,
> a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted.
> 
> Missing data, however, is a problem. Maybe I need some more information about your setup:
> 
> - When are you inspecting the part files?
Some time after the cluster is shut down
> - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written only after cancelling the job, right?
> - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS?
I have an accumulator in a map right before writing into HDFS. Also, the RollingSink has a DataTimeBucketer which makes it transparent when no new data is arriving anymore as the last bucket is from some minutes ago.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:19, Maximilian Bode <ma...@tngtech.com> wrote:
>> 
>> Hi Aljoscha,
>> 
>> oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
>> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
>> which seems to do the trick.
>> 
>> Unfortunately, now some records are missing again. In particular, there are the files
>> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
>> 	part-0-1, part-1-1, ..., part-10-0
>> in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.
>> 
>> Thanks again for your help.
>> 
>> Cheers,
>> Max
>> —
>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
>>> 
>>> Hi,
>>> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
>>> 
>>> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
>>> 
>>> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>>>> 
>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>> 
>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>>>> 
>>>> Cheers,
>>>> Max
>>>> <travis.log>
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>> 
>>>>> Hi Maximilian,
>>>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>>>> 
>>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>> 
>>>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>>> 
>>>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>>>> 
>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>>>> 
>>>>>>>> Hi everyone,
>>>>>>>> 
>>>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>> 
>>>>>>>> When using
>>>>>>>> 
>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>> output.addSink(sink);
>>>>>>>> 
>>>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>>>> ---
>>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>> 	... 3 more
>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>> 	... 4 more
>>>>>>>> ---
>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>>>> 
>>>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>> 
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>> 
>>>>>>>> —
>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
a missing part file for one of the parallel sinks is not necessarily a problem. This can happen if that parallel instance of the sink never received data after the job successfully restarted. 

Missing data, however, is a problem. Maybe I need some more information about your setup:

 - When are you inspecting the part files?
 - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
 - When do you know whether all the data from Kafka was consumed by Flink and has passed through the pipeline into HDFS?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:19, Maximilian Bode <ma...@tngtech.com> wrote:
> 
> Hi Aljoscha,
> 
> oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
> which seems to do the trick.
> 
> Unfortunately, now some records are missing again. In particular, there are the files
> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
> 	part-0-1, part-1-1, ..., part-10-0
> in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.
> 
> Thanks again for your help.
> 
> Cheers,
>  Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
>> 
>> Hi,
>> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
>> 
>> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
>> 
>> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>>> 
>>> Hi Aljoscha,
>>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>>> 
>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>> 
>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>>> 
>>> Cheers,
>>> Max
>>> <travis.log>
>>> — 
>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>>> 
>>>> Hi Maximilian,
>>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>>> 
>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>> 
>>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>>> 
>>>>> Hi,
>>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>>> 
>>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>>> 
>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>>> 
>>>>>>> Hi everyone,
>>>>>>> 
>>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>> 
>>>>>>> When using
>>>>>>> 
>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>> output.addSink(sink);
>>>>>>> 
>>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>>> ---
>>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>> 	... 3 more
>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>> 	... 4 more
>>>>>>> ---
>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>>> 
>>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> 
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>> 
>>>>>>> — 
>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,

oh I see. I was under the impression this file was used internally and the output being completed at the end. Ok, so I extracted the relevant lines using
	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final"; done
which seems to do the trick.

Unfortunately, now some records are missing again. In particular, there are the files
	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length files
	part-0-1, part-1-1, ..., part-10-0
in the bucket, where job parallelism=12. So it looks to us as if one of the files was not even created in the second attempt. This behavior seems to be what somewhat reproducible, cf. my earlier email where the part-11 file disappeared as well.

Thanks again for your help.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi,
> are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.
> 
> The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.
> 
> If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
>> 
>> Hi Aljoscha,
>> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
>> 
>> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>> 
>> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
>> 
>> Cheers,
>> Max
>> <travis.log>
>> —
>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>>> 
>>> Hi Maximilian,
>>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>>> 
>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>> 
>>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>>> 
>>>> Hi,
>>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>>> 
>>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>>> 
>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>>> 
>>>>>> Hi everyone,
>>>>>> 
>>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>> 
>>>>>> When using
>>>>>> 
>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>> output.addSink(sink);
>>>>>> 
>>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>>> ---
>>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>> 	... 3 more
>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>> 	... 4 more
>>>>>> ---
>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>>> 
>>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>>> 
>>>>>> Cheers,
>>>>>> Max
>>>>>> 
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>> 
>>>>>> —
>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
are you taking the “.valid-length” files into account. The problem with doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files. So the trick we’re using is to write the length up to which a file is valid if we would normally need to truncate it. (If the job fails in the middle of writing the output files have to be truncated to a valid position.) For example, say you have an output file part-8-0. Now, if there exists a file part-8-0.valid-length this file tells you up to which position the file part-8-0 is valid. So you should only read up to this point.

The name of the “.valid-length” suffix can also be configured, by the way, as can all the other stuff.

If this is not the problem then I definitely have to investigate further. I’ll also look into the Hadoop 2.4.1 build problem.

Cheers,
Aljoscha
> On 08 Mar 2016, at 10:26, Maximilian Bode <ma...@tngtech.com> wrote:
> 
> Hi Aljoscha,
> thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.
> 
> Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
> 
> On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?
> 
> Cheers,
>  Max
> <travis.log>
> — 
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
>> 
>> Hi Maximilian,
>> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
>> 
>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>> 
>> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>>> 
>>> Hi,
>>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>>> 
>>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>>> 
>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>>> 
>>>>> Hi everyone,
>>>>> 
>>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>> 
>>>>> When using
>>>>> 
>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>> output.addSink(sink);
>>>>> 
>>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>>> ---
>>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>> 	... 3 more
>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>> 	... 4 more
>>>>> ---
>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>>> 
>>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> 
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>> 
>>>>> — 
>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,
thanks again for getting back to me. I built from your branch and the exception is not occurring anymore. The RollingSink state can be restored.

Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records after killing either a task manager or the job manager. Do you have an idea where this behavior might be coming from? (I guess concrete numbers will not help greatly as there are so many parameters influencing them. Still, in our test scenario, we produce 2 million records in a Kafka queue but in the final output files there are on the order of 2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)

On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this one of the tests known to fail sometimes?

Cheers,
 Max

—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi Maximilian,
> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
> 
> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
> 
> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>> 
>> Hi,
>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>> 
>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>> 
>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>> 
>>>> When using
>>>> 
>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>> sink.setBucketer(new NonRollingBucketer());
>>>> output.addSink(sink);
>>>> 
>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>> ---
>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>> 	... 3 more
>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>> 	... 4 more
>>>> ---
>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>> 
>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>> 
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Aljoscha,

thank you very much, I will try if this fixes the problem and get back to you. I am using 1.0.0 as of today :)

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Hi Maximilian,
> sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.
> 
> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
> 
> Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
>> 
>> Hi,
>> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>>> 
>>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>>> 
>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>> 
>>>> When using
>>>> 
>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>> sink.setBucketer(new NonRollingBucketer());
>>>> output.addSink(sink);
>>>> 
>>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>>> ---
>>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>> 	... 3 more
>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>> 	... 4 more
>>>> ---
>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>>> 
>>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>> 
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Maximilian,
sorry for the delay, we where very busy with the release last week. I had a hunch about the problem but I think I found a fix now. The problem is in snapshot restore. When restoring, the sink tries to clean up any files that where previously in progress. If Flink restores to the same snapshot twice in a row then it will try to clean up the leftover files twice but they are not there anymore, this causes the exception.

I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix

Could you maybe try if this solves your problem? Which version of Flink are you using? You would have to build from source to try it out. Alternatively I could build it and put it onto a maven snapshot repository for you to try it out.

Cheers,
Aljoscha
> On 03 Mar 2016, at 14:50, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi,
> did you check whether there are any files at your specified HDFS output location? If yes, which files are there?
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
>> 
>> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
>> 
>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>>> 
>>> Hi everyone,
>>> 
>>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>> 
>>> When using
>>> 
>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>> sink.setBucketer(new NonRollingBucketer());
>>> output.addSink(sink);
>>> 
>>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>>> ---
>>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> 	at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>> 	... 3 more
>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>> 	... 4 more
>>> ---
>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>>> 
>>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>>> 
>>> Cheers,
>>> Max
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>> 
>>> — 
>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
did you check whether there are any files at your specified HDFS output location? If yes, which files are there?

Cheers,
Aljoscha
> On 03 Mar 2016, at 14:29, Maximilian Bode <ma...@tngtech.com> wrote:
> 
> Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.
> 
>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
>> 
>> Hi everyone,
>> 
>> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>> 
>> When using
>> 
>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>> sink.setBucketer(new NonRollingBucketer());
>> output.addSink(sink);
>> 
>> and then killing the job manager, the new job manager is unable to restore the old state throwing
>> ---
>> java.lang.Exception: Could not restore checkpointed state to operators and functions
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> 	at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>> 	... 3 more
>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor is still in progress.
>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>> 	... 4 more
>> ---
>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
>> 
>> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
>> 
>> Cheers,
>>  Max
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>> 
>> — 
>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
> 


Re: Jobmanager HA with Rolling Sink in HDFS

Posted by Maximilian Bode <ma...@tngtech.com>.
Just for the sake of completeness: this also happens when killing a task manager and is therefore probably unrelated to job manager HA.

> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <ma...@tngtech.com>:
> 
> Hi everyone,
> 
> unfortunately, I am running into another problem trying to establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
> 
> When using
> 
> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound <hdfs://our.machine.com:8020/hdfs/dir/outbound>");
> sink.setBucketer(new NonRollingBucketer());
> output.addSink(sink);
> 
> and then killing the job manager, the new job manager is unable to restore the old state throwing
> ---
> java.lang.Exception: Could not restore checkpointed state to operators and functions
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 <hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0> was neither moved to pending nor is still in progress.
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
> 	... 3 more
> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 <hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0> was neither moved to pending nor is still in progress.
> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
> 	... 4 more
> ---
> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might this be the same issue?
> 
> Another thing I could think of is that the job is not configured correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default value. Then again, as the NonRollingBucketer is used, there should not be any timing issues, right?
> 
> Cheers,
>  Max
> 
> [1] https://issues.apache.org/jira/browse/FLINK-2979 <https://issues.apache.org/jira/browse/FLINK-2979>
> 
> —
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com <ma...@tngtech.com>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>