You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chakravarthy varaga <ch...@gmail.com> on 2016/09/23 09:33:57 UTC

Flink Checkpoint runs slow for low load stream

Hi Aljoscha & Fabian,

    I have a stream application that has 2 stream source as below.

     KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
     KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split T
into k-v pairs).keyBy(0);

     ks1.connect(ks2).flatMap(X);
     //X is a CoFlatMapFunction that inserts and removes elements from ks2
into a key-value state member. Elements from ks1 are matched against that
state. the CoFlatMapFunction operator maintains ValueState<Tuple2<Long,
Long>>;

     //ks1 is streaming about 100K events/sec from kafka topic
     //ks2 is streaming about 1 event every 10 minutes... Precisely when
the 1st event is consumed from this stream, checkpoint takes 2 minutes
straight away.

    The version of flink is 1.1.2.

I tried to use checkpoint every 10 Secs using a FsStateBackend... What I
notice is that the checkpoint duration is almost 2 minutes for many cases,
while for the other cases it varies from 100 ms to 1.5 minutes frequently.
I'm attaching the snapshot of the dashboard for your reference.

     Is this an issue with flink checkpointing?

 Best Regards
CVP

Re: Flink Checkpoint runs slow for low load stream

Posted by vinay patil <vi...@gmail.com>.
Thanks Stephan for your inputs

We are getting the checkpointing issue for other projects as well in which
the window and encryption stuff is not there (using Flink 1.1.1).

As you suggested, I will try using RocksDB and run the pipeline on EMR to
provide more details.

Regards,
Vinay Patil

On Tue, Sep 27, 2016 at 1:43 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> @vinay - Flink needs to store all pending windows in the checkpoint, i.e.,
> windows that have elements but have not yet fires/purged.
>
> I guess client side encryption can add to the delay.
> If you use RocksDB asynchronous snapshots (1.1.x) then this delay should
> be hidden.
>
> Greetings,
> Stephan
>
>
> On Tue, Sep 27, 2016 at 5:20 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9216&i=0>> wrote:
>
>> Hi Stephan,
>>
>> Ok, I think that may be taking lot of time, so when you say everything
>> that it stores does it mean that all the input to the window  is stored in
>> state backend.
>>
>> For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can
>> contain multiple elements, and the DTO contains roughly 50 fields
>>
>> So do you mean that the complete DTO will be stored in the state backend
>> ? If yes then its probably better to use RocksDB as state backend.
>>
>> Also I am using AWS Client Side Encryption for writing encrypted data to
>> S3, so may be that is also taking some time.
>>
>> What do you think ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9211&i=0>> wrote:
>>
>>> @vinay - Window operators store everything in the state backend.
>>>
>>> On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9189&i=0>> wrote:
>>>
>>>> I am not sure about that, I will run the pipeline on cluster and share
>>>> the details
>>>> Since window is a stateful operator , it will store only the key part
>>>> in the state backend and not the value , right ?
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote:
>>>>
>>>>> @vinay - Is it in your case large state that causes slower checkpoints?
>>>>>
>>>>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am also facing this issue, in my case the data is flowing
>>>>>> continuously from the Kafka source, when I increase the checkpoint interval
>>>>>> to 60000, the data gets written to S3 sink.
>>>>>>
>>>>>> Is it because some operator is taking more time for processing, like
>>>>>> in my case I am using a time window of 1sec.
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache
>>>>>> Flink User Mailing List archive.] <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>>     Please find my responses below.
>>>>>>>
>>>>>>>     - What source are you using for the slow input?
>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>>> Streams*
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>
>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>> below.*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> streamEnv.setStateBackend(new
>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>>>
>>>>>>>
>>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>>> Kb...*
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>
>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30-
>>>>>>> you'd see that the checkpoints take more than a minute in each case. Before
>>>>>>> these checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>
>>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>>> (parallelism=1)
>>>>>>>
>>>>>>>     Please let me know if you need further info.,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>> information?
>>>>>>>>
>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>>>>>
>>>>>>>>> Hi CVP,
>>>>>>>>>
>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>>>>>
>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>
>>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>>
>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>
>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>
>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>
>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>
>>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>>>>
>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>
>>>>>>>>>>  Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> *flink_job_Plan.png* (42K) Download Attachment
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>>>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>>>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download
>>>>>>> Attachment
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914
>>>>>>> 7p9176.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email [hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here.
>>>>>>> NAML
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>>>>> load stream
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914
>>>>> 7p9181.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>>> load stream
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9189.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9211&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Flink Checkpoint runs slow for low
>> load stream
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9211.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> Checkpoint-runs-slow-for-low-load-stream-tp9147p9216.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9217.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
@vinay - Flink needs to store all pending windows in the checkpoint, i.e.,
windows that have elements but have not yet fires/purged.

I guess client side encryption can add to the delay.
If you use RocksDB asynchronous snapshots (1.1.x) then this delay should be
hidden.

Greetings,
Stephan


On Tue, Sep 27, 2016 at 5:20 PM, vinay patil <vi...@gmail.com>
wrote:

> Hi Stephan,
>
> Ok, I think that may be taking lot of time, so when you say everything
> that it stores does it mean that all the input to the window  is stored in
> state backend.
>
> For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can
> contain multiple elements, and the DTO contains roughly 50 fields
>
> So do you mean that the complete DTO will be stored in the state backend ?
> If yes then its probably better to use RocksDB as state backend.
>
> Also I am using AWS Client Side Encryption for writing encrypted data to
> S3, so may be that is also taking some time.
>
> What do you think ?
>
> Regards,
> Vinay Patil
>
> On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9211&i=0>> wrote:
>
>> @vinay - Window operators store everything in the state backend.
>>
>> On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9189&i=0>> wrote:
>>
>>> I am not sure about that, I will run the pipeline on cluster and share
>>> the details
>>> Since window is a stateful operator , it will store only the key part in
>>> the state backend and not the value , right ?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote:
>>>
>>>> @vinay - Is it in your case large state that causes slower checkpoints?
>>>>
>>>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am also facing this issue, in my case the data is flowing
>>>>> continuously from the Kafka source, when I increase the checkpoint interval
>>>>> to 60000, the data gets written to S3 sink.
>>>>>
>>>>> Is it because some operator is taking more time for processing, like
>>>>> in my case I am using a time window of 1sec.
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache
>>>>> Flink User Mailing List archive.] <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>     Please find my responses below.
>>>>>>
>>>>>>     - What source are you using for the slow input?
>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>> Streams*
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>
>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>> below.*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> streamEnv.setStateBackend(new
>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>>
>>>>>>
>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>> Kb...*
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>
>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30-
>>>>>> you'd see that the checkpoints take more than a minute in each case. Before
>>>>>> these checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>
>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>> (parallelism=1)
>>>>>>
>>>>>>     Please let me know if you need further info.,
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>> information?
>>>>>>>
>>>>>>>   - What source are you using for the slow input?
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>>>>
>>>>>>>> Hi CVP,
>>>>>>>>
>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>>>>
>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>
>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>
>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>
>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>
>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>> 2 minutes straight away.
>>>>>>>>>
>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>
>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>>>
>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>
>>>>>>>>>  Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> *flink_job_Plan.png* (42K) Download Attachment
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download
>>>>>> Attachment
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914
>>>>>> 7p9176.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>>>> load stream
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>> load stream
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-
>> tp9147p9189.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=9211&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Flink Checkpoint runs slow for low load
> stream
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9211.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Flink Checkpoint runs slow for low load stream

Posted by vinay patil <vi...@gmail.com>.
Hi Stephan,

Ok, I think that may be taking lot of time, so when you say everything that
it stores does it mean that all the input to the window  is stored in state
backend.

For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can
contain multiple elements, and the DTO contains roughly 50 fields

So do you mean that the complete DTO will be stored in the state backend ?
If yes then its probably better to use RocksDB as state backend.

Also I am using AWS Client Side Encryption for writing encrypted data to
S3, so may be that is also taking some time.

What do you think ?

Regards,
Vinay Patil

On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> @vinay - Window operators store everything in the state backend.
>
> On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9189&i=0>> wrote:
>
>> I am not sure about that, I will run the pipeline on cluster and share
>> the details
>> Since window is a stateful operator , it will store only the key part in
>> the state backend and not the value , right ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote:
>>
>>> @vinay - Is it in your case large state that causes slower checkpoints?
>>>
>>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am also facing this issue, in my case the data is flowing
>>>> continuously from the Kafka source, when I increase the checkpoint interval
>>>> to 60000, the data gets written to S3 sink.
>>>>
>>>> Is it because some operator is taking more time for processing, like in
>>>> my case I am using a time window of 1sec.
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
>>>> User Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>>     Please find my responses below.
>>>>>
>>>>>     - What source are you using for the slow input?
>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>> Streams*
>>>>>   - How large is the state that you are checkpointing?
>>>>>
>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>> below.*
>>>>>
>>>>>
>>>>>
>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> streamEnv.setStateBackend(new
>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>
>>>>>
>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>> Kb...*
>>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>>> travel through the stream due to a lot of backpressure?
>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>
>>>>>      I have attached the checkpoints times' as .png from the
>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>
>>>>>    This log was collected from the standalone flink cluster with 1 job
>>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>> (parallelism=1)
>>>>>
>>>>>     Please let me know if you need further info.,
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>> information?
>>>>>>
>>>>>>   - What source are you using for the slow input?
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>>>
>>>>>>> Hi CVP,
>>>>>>>
>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>>>
>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>
>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>
>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>
>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>
>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>>>>> straight away.
>>>>>>>>
>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>
>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>>
>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>
>>>>>>>>  Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>> *flink_job_Plan.png* (42K) Download Attachment
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download
>>>>> Attachment
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914
>>>>> 7p9176.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>>> load stream
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Flink Checkpoint runs slow for low
>> load stream
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> Checkpoint-runs-slow-for-low-load-stream-tp9147p9189.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9211.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
@vinay - Window operators store everything in the state backend.

On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <vi...@gmail.com>
wrote:

> I am not sure about that, I will run the pipeline on cluster and share the
> details
> Since window is a stateful operator , it will store only the key part in
> the state backend and not the value , right ?
>
> Regards,
> Vinay Patil
>
> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote:
>
>> @vinay - Is it in your case large state that causes slower checkpoints?
>>
>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>>
>>> Hi,
>>>
>>> I am also facing this issue, in my case the data is flowing continuously
>>> from the Kafka source, when I increase the checkpoint interval to 60000,
>>> the data gets written to S3 sink.
>>>
>>> Is it because some operator is taking more time for processing, like in
>>> my case I am using a time window of 1sec.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
>>> User Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>>     Please find my responses below.
>>>>
>>>>     - What source are you using for the slow input?
>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>> Streams*
>>>>   - How large is the state that you are checkpointing?
>>>>
>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>> below.*
>>>>
>>>>
>>>>
>>>> *         final StreamExecutionEnvironment streamEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> streamEnv.setStateBackend(new
>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>> streamEnv.enableCheckpointing(10000);*
>>>>
>>>>
>>>> *      In terms of the state stored, the KS1 stream has payload of 100K
>>>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>>>> operators perform flatmaps on 8 fields of tuple (all fields are
>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>> Kb...*
>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>> travel through the stream due to a lot of backpressure?
>>>>     [CVP] -There are no back pressure atleast from the sample
>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>
>>>>      I have attached the checkpoints times' as .png from the dashboard.
>>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that
>>>> the checkpoints take more than a minute in each case. Before these
>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>> subsequently a minute more for every checkpoint thereafter.
>>>>
>>>>    This log was collected from the standalone flink cluster with 1 job
>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>> (parallelism=1)
>>>>
>>>>     Please let me know if you need further info.,
>>>>
>>>>
>>>>
>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>> information?
>>>>>
>>>>>   - What source are you using for the slow input?
>>>>>   - How large is the state that you are checkpointing?
>>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>>>> through the stream due to a lot of backpressure?
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>>
>>>>>> Hi CVP,
>>>>>>
>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>>
>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>
>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>
>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>
>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>
>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>>>> straight away.
>>>>>>>
>>>>>>>     The version of flink is 1.1.2.
>>>>>>>
>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>
>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>
>>>>>>>  Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>> *flink_job_Plan.png* (42K) Download Attachment
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download
>>>> Attachment
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9176.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>> load stream
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-
>> tp9147p9181.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=9182&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Flink Checkpoint runs slow for low load
> stream
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Flink Checkpoint runs slow for low load stream

Posted by vinay patil <vi...@gmail.com>.
I am not sure about that, I will run the pipeline on cluster and share the
details
Since window is a stateful operator , it will store only the key part in
the state backend and not the value , right ?

Regards,
Vinay Patil

On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> @vinay - Is it in your case large state that causes slower checkpoints?
>
> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>
>> Hi,
>>
>> I am also facing this issue, in my case the data is flowing continuously
>> from the Kafka source, when I increase the checkpoint interval to 60000,
>> the data gets written to S3 sink.
>>
>> Is it because some operator is taking more time for processing, like in
>> my case I am using a time window of 1sec.
>>
>> Regards,
>> Vinay Patil
>>
>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
>> User Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>
>>> Hi Stefan,
>>>
>>>     Please find my responses below.
>>>
>>>     - What source are you using for the slow input?
>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>> Streams*
>>>   - How large is the state that you are checkpointing?
>>>
>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>>>
>>>
>>>
>>> *         final StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> streamEnv.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>> streamEnv.enableCheckpointing(10000);*
>>>
>>>
>>> *      In terms of the state stored, the KS1 stream has payload of 100K
>>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>>> operators perform flatmaps on 8 fields of tuple (all fields are
>>> primitives). If you look at the states' sizes in dashboard they are in
>>> Kb...*
>>>   - Can you try to see in the log if actually the state snapshot takes
>>> that long, or if it simply takes long for the checkpoint barriers to
>>> travel through the stream due to a lot of backpressure?
>>>     [CVP] -There are no back pressure atleast from the sample
>>> computation in the flink dashboard. 100K/second is low load for flink's
>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>
>>>      I have attached the checkpoints times' as .png from the dashboard.
>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
>>> checkpoints take more than a minute in each case. Before these checkpoints,
>>> the KS2 stream did not have any events. As soon as an event(should be in
>>> bytes) was generated, the checkpoints went slow and subsequently a minute
>>> more for every checkpoint thereafter.
>>>
>>>    This log was collected from the standalone flink cluster with 1 job
>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>> (parallelism=1)
>>>
>>>     Please let me know if you need further info.,
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>
>>>> Hi!
>>>>
>>>> Let's try to figure that one out. Can you give us a bit more
>>>> information?
>>>>
>>>>   - What source are you using for the slow input?
>>>>   - How large is the state that you are checkpointing?
>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>>> through the stream due to a lot of backpressure?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>
>>>>> Hi CVP,
>>>>>
>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>
>>>>>> Hi Aljoscha & Fabian,
>>>>>>
>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>
>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>
>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>
>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>>> straight away.
>>>>>>
>>>>>>     The version of flink is 1.1.2.
>>>>>>
>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>
>>>>>>      Is this an issue with flink checkpointing?
>>>>>>
>>>>>>  Best Regards
>>>>>> CVP
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>> *flink_job_Plan.png* (42K) Download Attachment
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download Attachment
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9176.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Flink Checkpoint runs slow for low
>> load stream
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
@vinay - Is it in your case large state that causes slower checkpoints?

On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <vi...@gmail.com>
wrote:

> Hi,
>
> I am also facing this issue, in my case the data is flowing continuously
> from the Kafka source, when I increase the checkpoint interval to 60000,
> the data gets written to S3 sink.
>
> Is it because some operator is taking more time for processing, like in my
> case I am using a time window of 1sec.
>
> Regards,
> Vinay Patil
>
> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
> User Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>
>> Hi Stefan,
>>
>>     Please find my responses below.
>>
>>     - What source are you using for the slow input?
>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>> Streams*
>>   - How large is the state that you are checkpointing?
>>
>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>>
>>
>>
>> *         final StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> streamEnv.setStateBackend(new
>> FsStateBackend("file:///tmp/flink/checkpoints"));
>> streamEnv.enableCheckpointing(10000);*
>>
>>
>> *      In terms of the state stored, the KS1 stream has payload of 100K
>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>> operators perform flatmaps on 8 fields of tuple (all fields are
>> primitives). If you look at the states' sizes in dashboard they are in
>> Kb...*
>>   - Can you try to see in the log if actually the state snapshot takes
>> that long, or if it simply takes long for the checkpoint barriers to
>> travel through the stream due to a lot of backpressure?
>>     [CVP] -There are no back pressure atleast from the sample
>> computation in the flink dashboard. 100K/second is low load for flink's
>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>> attached the Task Manager log (DEBUG) info if that will interest you.
>>
>>      I have attached the checkpoints times' as .png from the dashboard.
>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
>> checkpoints take more than a minute in each case. Before these checkpoints,
>> the KS2 stream did not have any events. As soon as an event(should be in
>> bytes) was generated, the checkpoints went slow and subsequently a minute
>> more for every checkpoint thereafter.
>>
>>    This log was collected from the standalone flink cluster with 1 job
>> manager & 2 TMs. 1 TM was running this application with checkpointing
>> (parallelism=1)
>>
>>     Please let me know if you need further info.,
>>
>>
>>
>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>
>>> Hi!
>>>
>>> Let's try to figure that one out. Can you give us a bit more information?
>>>
>>>   - What source are you using for the slow input?
>>>   - How large is the state that you are checkpointing?
>>>   - Can you try to see in the log if actually the state snapshot takes
>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>> through the stream due to a lot of backpressure?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>
>>>> Hi CVP,
>>>>
>>>> I'm not so much familiar with the internals of the checkpointing
>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>
>>>>> Hi Aljoscha & Fabian,
>>>>>
>>>>>     I have a stream application that has 2 stream source as below.
>>>>>
>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split
>>>>> T into k-v pairs).keyBy(0);
>>>>>
>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>      //X is a CoFlatMapFunction that inserts and removes elements from
>>>>> ks2 into a key-value state member. Elements from ks1 are matched against
>>>>> that state. the CoFlatMapFunction operator maintains
>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>
>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>> straight away.
>>>>>
>>>>>     The version of flink is 1.1.2.
>>>>>
>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What
>>>>> I notice is that the checkpoint duration is almost 2 minutes for many
>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>
>>>>>      Is this an issue with flink checkpointing?
>>>>>
>>>>>  Best Regards
>>>>> CVP
>>>>>
>>>>
>>>>
>>>
>>
>> *flink_job_Plan.png* (42K) Download Attachment
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download Attachment
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-
>> tp9147p9176.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Flink Checkpoint runs slow for low load
> stream
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Flink Checkpoint runs slow for low load stream

Posted by vinay patil <vi...@gmail.com>.
Hi,

I am also facing this issue, in my case the data is flowing continuously
from the Kafka source, when I increase the checkpoint interval to 60000,
the data gets written to S3 sink.

Is it because some operator is taking more time for processing, like in my
case I am using a time window of 1sec.

Regards,
Vinay Patil

On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
User Mailing List archive.] <ml...@n4.nabble.com> wrote:

> Hi Stefan,
>
>     Please find my responses below.
>
>     - What source are you using for the slow input?
> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
> Streams*
>   - How large is the state that you are checkpointing?
>
> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>
>
>
> *         final StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/checkpoints"));
> streamEnv.enableCheckpointing(10000);*
>
>
> *      In terms of the state stored, the KS1 stream has payload of 100K
> events/second, while KS2 have about 1 event / 10 minutes... basically the
> operators perform flatmaps on 8 fields of tuple (all fields are
> primitives). If you look at the states' sizes in dashboard they are in
> Kb...*
>   - Can you try to see in the log if actually the state snapshot takes
> that long, or if it simply takes long for the checkpoint barriers to
> travel through the stream due to a lot of backpressure?
>     [CVP] -There are no back pressure atleast from the sample computation
> in the flink dashboard. 100K/second is low load for flink's benchmarks. I
> could not quite get the barriers vs snapshot state. I have attached the
> Task Manager log (DEBUG) info if that will interest you.
>
>      I have attached the checkpoints times' as .png from the dashboard.
> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
> checkpoints take more than a minute in each case. Before these checkpoints,
> the KS2 stream did not have any events. As soon as an event(should be in
> bytes) was generated, the checkpoints went slow and subsequently a minute
> more for every checkpoint thereafter.
>
>    This log was collected from the standalone flink cluster with 1 job
> manager & 2 TMs. 1 TM was running this application with checkpointing
> (parallelism=1)
>
>     Please let me know if you need further info.,
>
>
>
> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>
>> Hi!
>>
>> Let's try to figure that one out. Can you give us a bit more information?
>>
>>   - What source are you using for the slow input?
>>   - How large is the state that you are checkpointing?
>>   - Can you try to see in the log if actually the state snapshot takes
>> that long, or if it simply takes long for the checkpoint barriers to travel
>> through the stream due to a lot of backpressure?
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>
>>> Hi CVP,
>>>
>>> I'm not so much familiar with the internals of the checkpointing system,
>>> but maybe Stephan (in CC) has an idea what's going on here.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>
>>>> Hi Aljoscha & Fabian,
>>>>
>>>>     I have a stream application that has 2 stream source as below.
>>>>
>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split
>>>> T into k-v pairs).keyBy(0);
>>>>
>>>>      ks1.connect(ks2).flatMap(X);
>>>>      //X is a CoFlatMapFunction that inserts and removes elements from
>>>> ks2 into a key-value state member. Elements from ks1 are matched against
>>>> that state. the CoFlatMapFunction operator maintains
>>>> ValueState<Tuple2<Long, Long>>;
>>>>
>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>> straight away.
>>>>
>>>>     The version of flink is 1.1.2.
>>>>
>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What
>>>> I notice is that the checkpoint duration is almost 2 minutes for many
>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>
>>>>      Is this an issue with flink checkpointing?
>>>>
>>>>  Best Regards
>>>> CVP
>>>>
>>>
>>>
>>
>
> *flink_job_Plan.png* (42K) Download Attachment
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
> *Flink-Checkpoint-Times.png* (65K) Download Attachment
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download Attachment
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> Checkpoint-runs-slow-for-low-load-stream-tp9147p9176.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink Checkpoint runs slow for low load stream

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi all,

I am experiencing a similar problem but with HDFS as a source instead of
Kafka. I have a streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours).

In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Any help is appreciated. Thank you.

Best,
Yassine

On Jan 5, 2017 12:25, "Chakravarthy varaga" <ch...@gmail.com>
wrote:

> BRILLIANT !!!
>
> Checkpoint times are consistent with 1.1.4...
>
> Thanks for your formidable support !
>
> Best Regards
> CVP
>
> On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi CVP,
>>
>> we recently release Flink 1.1.4, i.e., the next bugfix release of the
>> 1.1.x series with major robustness improvements [1].
>> You might want to give 1.1.4 a try as well.
>>
>> Best, Fabian
>>
>> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>>
>> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> Hi Stephan, All,
>>>
>>>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>>> backend. It seemed to have been fixed. Thanks for the fix.
>>>
>>>      While testing this, with varying check point intervals, there seem
>>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>>      Basically 15 secs seem to be the nominal value so far. anything
>>> below this interval shoots the spikes too often. For us living with 15 sec
>>> recovery is do-able and eventually catch up on recovery !
>>>
>>> Best Regards
>>> CVP
>>>
>>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Thanks for your prompt response Stephan.
>>>>
>>>>     I'd wait for Flink 1.1.3 !!!
>>>>
>>>> Best Regards
>>>> Varaga
>>>>
>>>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> The plan to release 1.1.3 is asap ;-)
>>>>>
>>>>> Waiting for last backported patched to get in, then release testing
>>>>> and release.
>>>>>
>>>>> If you want to test it today, you would need to manually build the
>>>>> release-1.1 branch.
>>>>>
>>>>> Best,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Gordon,
>>>>>>
>>>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to
>>>>>> release it in 1.1.3?
>>>>>>
>>>>>> Best Regards
>>>>>> Varaga
>>>>>>
>>>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>>>> tzulitai@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>>>>> https://github.com/apache/flink/pull/2574.
>>>>>>> It has already been merged into the master and release-1.1 branches,
>>>>>>> so you can try out the changes now if you’d like.
>>>>>>> The change should also be included in the 1.1.3 release, which the
>>>>>>> Flink community is discussing to release soon.
>>>>>>>
>>>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Gordon
>>>>>>>
>>>>>>>
>>>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>>>> chakravarthyvp@gmail.com) wrote:
>>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>>>
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stephan,
>>>>>>>>
>>>>>>>>      That should be great. Let me know once the fix is done and the
>>>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>>>
>>>>>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>>>>> reoccurs.
>>>>>>>>
>>>>>>>> Thanks much
>>>>>>>> Varaga
>>>>>>>>
>>>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> @CVP
>>>>>>>>>
>>>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets
>>>>>>>>> (few bytes) and the custom state (e).
>>>>>>>>>
>>>>>>>>> Here is an illustration of the checkpoint and what is stored (from
>>>>>>>>> the Flink docs).
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>>>> nals/stream_checkpointing.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am quite puzzled why the offset committing problem occurs only
>>>>>>>>> for one input, and not for the other.
>>>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>>>>> problem?
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Stefan,
>>>>>>>>>>
>>>>>>>>>>      Thanks a million for your detailed explanation. I appreciate
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to
>>>>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper running
>>>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>>>>>
>>>>>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs
>>>>>>>>>> started with no HA. I presume this does not use zookeeper anyways as it
>>>>>>>>>> runs as standalone cluster.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>      BTW., The kafka connector version that I use is as suggested
>>>>>>>>>> in the flink connectors page
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *.        <dependency>
>>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>>>
>>>>>>>>>>      Do you see any issues with versions?
>>>>>>>>>>
>>>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>>>>>
>>>>>>>>>>      2) There isn't detailed explanation on what states are
>>>>>>>>>> stored as part of the checkpointing process. For ex.,  If I have pipeline
>>>>>>>>>> like
>>>>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>>>>>> stored is:*
>>>>>>>>>>
>>>>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>>>>
>>>>>>>>>> *         b) Intermediate states of each of the transformations
>>>>>>>>>> in the pipeline*
>>>>>>>>>>
>>>>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>>>>
>>>>>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>>>> *         e) All of my operators*
>>>>>>>>>>
>>>>>>>>>>       Is my understanding right?
>>>>>>>>>>
>>>>>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated
>>>>>>>>>> above
>>>>>>>>>>
>>>>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>>>
>>>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and
>>>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>>>
>>>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>>>
>>>>>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>>>>>>>> (the other is fine).
>>>>>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>>>>>> concurrency model for users.
>>>>>>>>>>>   - The operation that is still in progress must be the
>>>>>>>>>>> committing of the offsets (to ZooKeeper or Kafka). That also explains why
>>>>>>>>>>> this only happens once one side receives the first record. Before that,
>>>>>>>>>>> there is nothing to commit.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What Flink should fix:
>>>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>>>>>
>>>>>>>>>>> What you can fix:
>>>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input
>>>>>>>>>>> works well, the other does not. Do they go against different sets of
>>>>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad?
>>>>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>>>>>> checkpoints anyways.
>>>>>>>>>>>
>>>>>>>>>>> Greetings,
>>>>>>>>>>> Stephan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>
>>>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>>>
>>>>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>>>>>> Kafka Streams*
>>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>>
>>>>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
>>>>>>>>>>>> as below.*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *      In terms of the state stored, the KS1 stream has payload
>>>>>>>>>>>> of 100K events/second, while KS2 have about 1 event / 10 minutes...
>>>>>>>>>>>> basically the operators perform flatmaps on 8 fields of tuple (all fields
>>>>>>>>>>>> are primitives). If you look at the states' sizes in dashboard they are in
>>>>>>>>>>>> Kb... *
>>>>>>>>>>>>   - Can you try to see in the log if actually the state
>>>>>>>>>>>> snapshot takes that long, or if it simply takes long for the
>>>>>>>>>>>> checkpoint barriers to travel through the stream due to a lot
>>>>>>>>>>>> of backpressure?
>>>>>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>>>>>>
>>>>>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>>>>
>>>>>>>>>>>>    This log was collected from the standalone flink cluster
>>>>>>>>>>>> with 1 job manager & 2 TMs. 1 TM was running this application with
>>>>>>>>>>>> checkpointing (parallelism=1)
>>>>>>>>>>>>
>>>>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <sewen@apache.org
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>>>>>> information?
>>>>>>>>>>>>>
>>>>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>>>   - Can you try to see in the log if actually the state
>>>>>>>>>>>>> snapshot takes that long, or if it simply takes long for the checkpoint
>>>>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Greetings,
>>>>>>>>>>>>> Stephan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <
>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm not so much familiar with the internals of the
>>>>>>>>>>>>>> checkpointing system, but maybe Stephan (in CC) has an idea what's going on
>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>>>>>> below.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 are
>>>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka
>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is almost 2
>>>>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 100 ms to
>>>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for
>>>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
BRILLIANT !!!

Checkpoint times are consistent with 1.1.4...

Thanks for your formidable support !

Best Regards
CVP

On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi CVP,
>
> we recently release Flink 1.1.4, i.e., the next bugfix release of the
> 1.1.x series with major robustness improvements [1].
> You might want to give 1.1.4 a try as well.
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>
> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> Hi Stephan, All,
>>
>>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>> backend. It seemed to have been fixed. Thanks for the fix.
>>
>>      While testing this, with varying check point intervals, there seem
>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>      Basically 15 secs seem to be the nominal value so far. anything
>> below this interval shoots the spikes too often. For us living with 15 sec
>> recovery is do-able and eventually catch up on recovery !
>>
>> Best Regards
>> CVP
>>
>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Thanks for your prompt response Stephan.
>>>
>>>     I'd wait for Flink 1.1.3 !!!
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> The plan to release 1.1.3 is asap ;-)
>>>>
>>>> Waiting for last backported patched to get in, then release testing and
>>>> release.
>>>>
>>>> If you want to test it today, you would need to manually build the
>>>> release-1.1 branch.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Gordon,
>>>>>
>>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>>>> it in 1.1.3?
>>>>>
>>>>> Best Regards
>>>>> Varaga
>>>>>
>>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>>>> https://github.com/apache/flink/pull/2574.
>>>>>> It has already been merged into the master and release-1.1 branches,
>>>>>> so you can try out the changes now if you’d like.
>>>>>> The change should also be included in the 1.1.3 release, which the
>>>>>> Flink community is discussing to release soon.
>>>>>>
>>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>>
>>>>>> Best Regards,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>>> chakravarthyvp@gmail.com) wrote:
>>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>>
>>>>>> Varaga
>>>>>>
>>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>>      That should be great. Let me know once the fix is done and the
>>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>>
>>>>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>>>> reoccurs.
>>>>>>>
>>>>>>> Thanks much
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> @CVP
>>>>>>>>
>>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets
>>>>>>>> (few bytes) and the custom state (e).
>>>>>>>>
>>>>>>>> Here is an illustration of the checkpoint and what is stored (from
>>>>>>>> the Flink docs).
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>>> nals/stream_checkpointing.html
>>>>>>>>
>>>>>>>>
>>>>>>>> I am quite puzzled why the offset committing problem occurs only
>>>>>>>> for one input, and not for the other.
>>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>>>> problem?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>>      Thanks a million for your detailed explanation. I appreciate
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to
>>>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper running
>>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>>>>
>>>>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs
>>>>>>>>> started with no HA. I presume this does not use zookeeper anyways as it
>>>>>>>>> runs as standalone cluster.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      BTW., The kafka connector version that I use is as suggested
>>>>>>>>> in the flink connectors page
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *.        <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>>
>>>>>>>>>      Do you see any issues with versions?
>>>>>>>>>
>>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>>>>
>>>>>>>>>      2) There isn't detailed explanation on what states are stored
>>>>>>>>> as part of the checkpointing process. For ex.,  If I have pipeline like
>>>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>>>>> stored is:*
>>>>>>>>>
>>>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>>>
>>>>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>>>>> the pipeline*
>>>>>>>>>
>>>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>>>
>>>>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>>> *         e) All of my operators*
>>>>>>>>>
>>>>>>>>>       Is my understanding right?
>>>>>>>>>
>>>>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated
>>>>>>>>> above
>>>>>>>>>
>>>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>>
>>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and
>>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>>
>>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>>
>>>>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>>>>>>> (the other is fine).
>>>>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>>>>> concurrency model for users.
>>>>>>>>>>   - The operation that is still in progress must be the
>>>>>>>>>> committing of the offsets (to ZooKeeper or Kafka). That also explains why
>>>>>>>>>> this only happens once one side receives the first record. Before that,
>>>>>>>>>> there is nothing to commit.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What Flink should fix:
>>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>>>>
>>>>>>>>>> What you can fix:
>>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input
>>>>>>>>>> works well, the other does not. Do they go against different sets of
>>>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad?
>>>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>>>>> checkpoints anyways.
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>
>>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>>
>>>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>>>>> Kafka Streams*
>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>
>>>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
>>>>>>>>>>> as below.*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *      In terms of the state stored, the KS1 stream has payload
>>>>>>>>>>> of 100K events/second, while KS2 have about 1 event / 10 minutes...
>>>>>>>>>>> basically the operators perform flatmaps on 8 fields of tuple (all fields
>>>>>>>>>>> are primitives). If you look at the states' sizes in dashboard they are in
>>>>>>>>>>> Kb... *
>>>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>>>>>
>>>>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>>>
>>>>>>>>>>>    This log was collected from the standalone flink cluster with
>>>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>>>>>>> (parallelism=1)
>>>>>>>>>>>
>>>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi!
>>>>>>>>>>>>
>>>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>>>>> information?
>>>>>>>>>>>>
>>>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>>   - Can you try to see in the log if actually the state
>>>>>>>>>>>> snapshot takes that long, or if it simply takes long for the checkpoint
>>>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>>>>
>>>>>>>>>>>> Greetings,
>>>>>>>>>>>> Stephan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <
>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm not so much familiar with the internals of the
>>>>>>>>>>>>> checkpointing system, but maybe Stephan (in CC) has an idea what's going on
>>>>>>>>>>>>> here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>>>>> below.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 are
>>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is almost 2
>>>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 100 ms to
>>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for
>>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi CVP,

we recently release Flink 1.1.4, i.e., the next bugfix release of the 1.1.x
series with major robustness improvements [1].
You might want to give 1.1.4 a try as well.

Best, Fabian

[1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html

2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <ch...@gmail.com>:

> Hi Stephan, All,
>
>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
> backend. It seemed to have been fixed. Thanks for the fix.
>
>      While testing this, with varying check point intervals, there seem to
> be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>      Basically 15 secs seem to be the nominal value so far. anything below
> this interval shoots the spikes too often. For us living with 15 sec
> recovery is do-able and eventually catch up on recovery !
>
> Best Regards
> CVP
>
> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Thanks for your prompt response Stephan.
>>
>>     I'd wait for Flink 1.1.3 !!!
>>
>> Best Regards
>> Varaga
>>
>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> The plan to release 1.1.3 is asap ;-)
>>>
>>> Waiting for last backported patched to get in, then release testing and
>>> release.
>>>
>>> If you want to test it today, you would need to manually build the
>>> release-1.1 branch.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Gordon,
>>>>
>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>>> it in 1.1.3?
>>>>
>>>> Best Regards
>>>> Varaga
>>>>
>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>>> https://github.com/apache/flink/pull/2574.
>>>>> It has already been merged into the master and release-1.1 branches,
>>>>> so you can try out the changes now if you’d like.
>>>>> The change should also be included in the 1.1.3 release, which the
>>>>> Flink community is discussing to release soon.
>>>>>
>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>
>>>>> Best Regards,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>> chakravarthyvp@gmail.com) wrote:
>>>>>
>>>>> Hi Stephan,
>>>>>
>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>
>>>>> Varaga
>>>>>
>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>>      That should be great. Let me know once the fix is done and the
>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>
>>>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>>> reoccurs.
>>>>>>
>>>>>> Thanks much
>>>>>> Varaga
>>>>>>
>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> @CVP
>>>>>>>
>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>>>>> bytes) and the custom state (e).
>>>>>>>
>>>>>>> Here is an illustration of the checkpoint and what is stored (from
>>>>>>> the Flink docs).
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>> nals/stream_checkpointing.html
>>>>>>>
>>>>>>>
>>>>>>> I am quite puzzled why the offset committing problem occurs only for
>>>>>>> one input, and not for the other.
>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>>> problem?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>>      Thanks a million for your detailed explanation. I appreciate
>>>>>>>> it.
>>>>>>>>
>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to
>>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper running
>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>>>
>>>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs
>>>>>>>> started with no HA. I presume this does not use zookeeper anyways as it
>>>>>>>> runs as standalone cluster.
>>>>>>>>
>>>>>>>>
>>>>>>>>      BTW., The kafka connector version that I use is as suggested
>>>>>>>> in the flink connectors page
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *.        <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>
>>>>>>>>      Do you see any issues with versions?
>>>>>>>>
>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>>>
>>>>>>>>      2) There isn't detailed explanation on what states are stored
>>>>>>>> as part of the checkpointing process. For ex.,  If I have pipeline like
>>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>>>> stored is:*
>>>>>>>>
>>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>>
>>>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>>>> the pipeline*
>>>>>>>>
>>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>>
>>>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>> *         e) All of my operators*
>>>>>>>>
>>>>>>>>       Is my understanding right?
>>>>>>>>
>>>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated
>>>>>>>> above
>>>>>>>>
>>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>
>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and
>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>
>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>
>>>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>>>>>> (the other is fine).
>>>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>>>> concurrency model for users.
>>>>>>>>>   - The operation that is still in progress must be the committing
>>>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>>>>>> happens once one side receives the first record. Before that, there is
>>>>>>>>> nothing to commit.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What Flink should fix:
>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>>>
>>>>>>>>> What you can fix:
>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input
>>>>>>>>> works well, the other does not. Do they go against different sets of
>>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad?
>>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>>>> checkpoints anyways.
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Stefan,
>>>>>>>>>>
>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>
>>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>>>> Kafka Streams*
>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>
>>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>>>>> below.*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *      In terms of the state stored, the KS1 stream has payload
>>>>>>>>>> of 100K events/second, while KS2 have about 1 event / 10 minutes...
>>>>>>>>>> basically the operators perform flatmaps on 8 fields of tuple (all fields
>>>>>>>>>> are primitives). If you look at the states' sizes in dashboard they are in
>>>>>>>>>> Kb... *
>>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>>>>
>>>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>>
>>>>>>>>>>    This log was collected from the standalone flink cluster with
>>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>>>>>> (parallelism=1)
>>>>>>>>>>
>>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi!
>>>>>>>>>>>
>>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>>>> information?
>>>>>>>>>>>
>>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>>>
>>>>>>>>>>> Greetings,
>>>>>>>>>>> Stephan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm not so much familiar with the internals of the
>>>>>>>>>>>> checkpointing system, but maybe Stephan (in CC) has an idea what's going on
>>>>>>>>>>>> here.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>
>>>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>>>> below.
>>>>>>>>>>>>>
>>>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>>
>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 are
>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>>
>>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>
>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is almost 2
>>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 100 ms to
>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for
>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>
>>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Stephan, All,

     I just got a chance to try if 1.1.3 fixes slow check pointing on FS
backend. It seemed to have been fixed. Thanks for the fix.

     While testing this, with varying check point intervals, there seem to
be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
     Basically 15 secs seem to be the nominal value so far. anything below
this interval shoots the spikes too often. For us living with 15 sec
recovery is do-able and eventually catch up on recovery !

Best Regards
CVP

On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Thanks for your prompt response Stephan.
>
>     I'd wait for Flink 1.1.3 !!!
>
> Best Regards
> Varaga
>
> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> The plan to release 1.1.3 is asap ;-)
>>
>> Waiting for last backported patched to get in, then release testing and
>> release.
>>
>> If you want to test it today, you would need to manually build the
>> release-1.1 branch.
>>
>> Best,
>> Stephan
>>
>>
>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Gordon,
>>>
>>>      Do I need to clone and build release-1.1 branch to test this?
>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>> it in 1.1.3?
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>> https://github.com/apache/flink/pull/2574.
>>>> It has already been merged into the master and release-1.1 branches, so
>>>> you can try out the changes now if you’d like.
>>>> The change should also be included in the 1.1.3 release, which the
>>>> Flink community is discussing to release soon.
>>>>
>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>> chakravarthyvp@gmail.com) wrote:
>>>>
>>>> Hi Stephan,
>>>>
>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>
>>>> Varaga
>>>>
>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>>      That should be great. Let me know once the fix is done and the
>>>>> snapshot version to use, I'll check and revert then.
>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>
>>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>> reoccurs.
>>>>>
>>>>> Thanks much
>>>>> Varaga
>>>>>
>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> @CVP
>>>>>>
>>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>>>> bytes) and the custom state (e).
>>>>>>
>>>>>> Here is an illustration of the checkpoint and what is stored (from
>>>>>> the Flink docs).
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>> nals/stream_checkpointing.html
>>>>>>
>>>>>>
>>>>>> I am quite puzzled why the offset committing problem occurs only for
>>>>>> one input, and not for the other.
>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>> problem?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>>>>
>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to
>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper running
>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>>
>>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>>>>> standalone cluster.
>>>>>>>
>>>>>>>
>>>>>>>      BTW., The kafka connector version that I use is as suggested in
>>>>>>> the flink connectors page
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *.        <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>
>>>>>>>      Do you see any issues with versions?
>>>>>>>
>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>>
>>>>>>>      2) There isn't detailed explanation on what states are stored
>>>>>>> as part of the checkpointing process. For ex.,  If I have pipeline like
>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>>> stored is:*
>>>>>>>
>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>
>>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>>> the pipeline*
>>>>>>>
>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>
>>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>>> Essentially this is what I bother about storing.*
>>>>>>> *         e) All of my operators*
>>>>>>>
>>>>>>>       Is my understanding right?
>>>>>>>
>>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>>>
>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>>>
>>>>>>> Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>
>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and
>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>
>>>>>>>> Here is what is happening in detail:
>>>>>>>>
>>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>>>>> (the other is fine).
>>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>>> concurrency model for users.
>>>>>>>>   - The operation that is still in progress must be the committing
>>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>>>>> happens once one side receives the first record. Before that, there is
>>>>>>>> nothing to commit.
>>>>>>>>
>>>>>>>>
>>>>>>>> What Flink should fix:
>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>>
>>>>>>>> What you can fix:
>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input
>>>>>>>> works well, the other does not. Do they go against different sets of
>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad?
>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>>> checkpoints anyways.
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>>     Please find my responses below.
>>>>>>>>>
>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>>> Kafka Streams*
>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>
>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>>>> below.*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>>>>> Kb... *
>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>>>
>>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>
>>>>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>>>>> (parallelism=1)
>>>>>>>>>
>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi!
>>>>>>>>>>
>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>>> information?
>>>>>>>>>>
>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>
>>>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>
>>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>>> below.
>>>>>>>>>>>>
>>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>
>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 are
>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>
>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>
>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>
>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is almost 2
>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 100 ms to
>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for
>>>>>>>>>>>> your reference.
>>>>>>>>>>>>
>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>
>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>> CVP
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Thanks for your prompt response Stephan.

    I'd wait for Flink 1.1.3 !!!

Best Regards
Varaga

On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:

> The plan to release 1.1.3 is asap ;-)
>
> Waiting for last backported patched to get in, then release testing and
> release.
>
> If you want to test it today, you would need to manually build the
> release-1.1 branch.
>
> Best,
> Stephan
>
>
> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Gordon,
>>
>>      Do I need to clone and build release-1.1 branch to test this?
>>      I currently use flinlk 1.1.2 runtime. When is the plan to release it
>> in 1.1.3?
>>
>> Best Regards
>> Varaga
>>
>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Helping out here: this is the PR for async Kafka offset committing -
>>> https://github.com/apache/flink/pull/2574.
>>> It has already been merged into the master and release-1.1 branches, so
>>> you can try out the changes now if you’d like.
>>> The change should also be included in the 1.1.3 release, which the Flink
>>> community is discussing to release soon.
>>>
>>> Will definitely be helpful if you can provide feedback afterwards!
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>> chakravarthyvp@gmail.com) wrote:
>>>
>>> Hi Stephan,
>>>
>>>     Is the Async kafka offset commit released in 1.3.1?
>>>
>>> Varaga
>>>
>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>>      That should be great. Let me know once the fix is done and the
>>>> snapshot version to use, I'll check and revert then.
>>>>      Can you also share the JIRA that tracks the issue?
>>>>
>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>> reoccurs.
>>>>
>>>> Thanks much
>>>> Varaga
>>>>
>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> @CVP
>>>>>
>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>>> bytes) and the custom state (e).
>>>>>
>>>>> Here is an illustration of the checkpoint and what is stored (from the
>>>>> Flink docs).
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>> nals/stream_checkpointing.html
>>>>>
>>>>>
>>>>> I am quite puzzled why the offset committing problem occurs only for
>>>>> one input, and not for the other.
>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>> Could you try out a snapshot version to see if that fixes your problem?
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>>>
>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>>>>> localhost (ubuntu 14.04)
>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>
>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>>>> standalone cluster.
>>>>>>
>>>>>>
>>>>>>      BTW., The kafka connector version that I use is as suggested in
>>>>>> the flink connectors page
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *.        <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>
>>>>>>      Do you see any issues with versions?
>>>>>>
>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>
>>>>>>      2) There isn't detailed explanation on what states are stored as
>>>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>> stored is:*
>>>>>>
>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>
>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>> the pipeline*
>>>>>>
>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>
>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>> Essentially this is what I bother about storing.*
>>>>>> *         e) All of my operators*
>>>>>>
>>>>>>       Is my understanding right?
>>>>>>
>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>>
>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>>
>>>>>> Best Regards
>>>>>> CVP
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, the logs were very helpful!
>>>>>>>
>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>>>> proper starting of checkpoints.
>>>>>>>
>>>>>>> Here is what is happening in detail:
>>>>>>>
>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>>>> (the other is fine).
>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>> concurrency model for users.
>>>>>>>   - The operation that is still in progress must be the committing
>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>>>> happens once one side receives the first record. Before that, there is
>>>>>>> nothing to commit.
>>>>>>>
>>>>>>>
>>>>>>> What Flink should fix:
>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>
>>>>>>> What you can fix:
>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>>>> well, the other does not. Do they go against different sets of brokers, or
>>>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>> checkpoints anyways.
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>>     Please find my responses below.
>>>>>>>>
>>>>>>>>     - What source are you using for the slow input?
>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>> Kafka Streams*
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>
>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>>> below.*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>
>>>>>>>>
>>>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>>>> Kb... *
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>>
>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>
>>>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>>>> (parallelism=1)
>>>>>>>>
>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi!
>>>>>>>>>
>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>> information?
>>>>>>>>>
>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi CVP,
>>>>>>>>>>
>>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>
>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>> below.
>>>>>>>>>>>
>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>
>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 are
>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>
>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>
>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>
>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is almost 2
>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 100 ms to
>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for
>>>>>>>>>>> your reference.
>>>>>>>>>>>
>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>
>>>>>>>>>>>  Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
The plan to release 1.1.3 is asap ;-)

Waiting for last backported patched to get in, then release testing and
release.

If you want to test it today, you would need to manually build the
release-1.1 branch.

Best,
Stephan


On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Gordon,
>
>      Do I need to clone and build release-1.1 branch to test this?
>      I currently use flinlk 1.1.2 runtime. When is the plan to release it
> in 1.1.3?
>
> Best Regards
> Varaga
>
> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> Helping out here: this is the PR for async Kafka offset committing -
>> https://github.com/apache/flink/pull/2574.
>> It has already been merged into the master and release-1.1 branches, so
>> you can try out the changes now if you’d like.
>> The change should also be included in the 1.1.3 release, which the Flink
>> community is discussing to release soon.
>>
>> Will definitely be helpful if you can provide feedback afterwards!
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>> chakravarthyvp@gmail.com) wrote:
>>
>> Hi Stephan,
>>
>>     Is the Async kafka offset commit released in 1.3.1?
>>
>> Varaga
>>
>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Stephan,
>>>
>>>      That should be great. Let me know once the fix is done and the
>>> snapshot version to use, I'll check and revert then.
>>>      Can you also share the JIRA that tracks the issue?
>>>
>>>      With regards to offset commit issue, I'm not sure as to how to
>>> proceed here. Probably I'll use your fix first and see if the problem
>>> reoccurs.
>>>
>>> Thanks much
>>> Varaga
>>>
>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> @CVP
>>>>
>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>> bytes) and the custom state (e).
>>>>
>>>> Here is an illustration of the checkpoint and what is stored (from the
>>>> Flink docs).
>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>> nals/stream_checkpointing.html
>>>>
>>>>
>>>> I am quite puzzled why the offset committing problem occurs only for
>>>> one input, and not for the other.
>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>> Could you try out a snapshot version to see if that fixes your problem?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>>
>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>>>> localhost (ubuntu 14.04)
>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>
>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>>> standalone cluster.
>>>>>
>>>>>
>>>>>      BTW., The kafka connector version that I use is as suggested in
>>>>> the flink connectors page
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *.        <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>> <version>1.1.1</version>         </dependency>*
>>>>>
>>>>>      Do you see any issues with versions?
>>>>>
>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>
>>>>>      2) There isn't detailed explanation on what states are stored as
>>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>>>> is:*
>>>>>
>>>>> *         a) The source stream's custom watermarked records*
>>>>>
>>>>> *         b) Intermediate states of each of the transformations in the
>>>>> pipeline*
>>>>>
>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>
>>>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>>>> this is what I bother about storing.*
>>>>> *         e) All of my operators*
>>>>>
>>>>>       Is my understanding right?
>>>>>
>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>
>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>
>>>>> Best Regards
>>>>> CVP
>>>>>
>>>>>
>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Thanks, the logs were very helpful!
>>>>>>
>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>>> proper starting of checkpoints.
>>>>>>
>>>>>> Here is what is happening in detail:
>>>>>>
>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>>> (the other is fine).
>>>>>>   - The only way this delayed can be introduced is if another
>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>> concurrency model for users.
>>>>>>   - The operation that is still in progress must be the committing of
>>>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>>> happens once one side receives the first record. Before that, there is
>>>>>> nothing to commit.
>>>>>>
>>>>>>
>>>>>> What Flink should fix:
>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>
>>>>>> What you can fix:
>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>>> well, the other does not. Do they go against different sets of brokers, or
>>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>> checkpoints anyways.
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>>     Please find my responses below.
>>>>>>>
>>>>>>>     - What source are you using for the slow input?
>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>>> Streams*
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>
>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>> below.*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> streamEnv.setStateBackend(new
>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>
>>>>>>>
>>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>>> Kb... *
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>
>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>
>>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>>> (parallelism=1)
>>>>>>>
>>>>>>>     Please let me know if you need further info.,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>> information?
>>>>>>>>
>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi CVP,
>>>>>>>>>
>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>
>>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>>
>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>
>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>
>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>
>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>
>>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>>>>
>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>
>>>>>>>>>>  Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Gordon,

     Do I need to clone and build release-1.1 branch to test this?
     I currently use flinlk 1.1.2 runtime. When is the plan to release it
in 1.1.3?

Best Regards
Varaga

On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> Helping out here: this is the PR for async Kafka offset committing -
> https://github.com/apache/flink/pull/2574.
> It has already been merged into the master and release-1.1 branches, so
> you can try out the changes now if you’d like.
> The change should also be included in the 1.1.3 release, which the Flink
> community is discussing to release soon.
>
> Will definitely be helpful if you can provide feedback afterwards!
>
> Best Regards,
> Gordon
>
>
> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
> chakravarthyvp@gmail.com) wrote:
>
> Hi Stephan,
>
>     Is the Async kafka offset commit released in 1.3.1?
>
> Varaga
>
> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Stephan,
>>
>>      That should be great. Let me know once the fix is done and the
>> snapshot version to use, I'll check and revert then.
>>      Can you also share the JIRA that tracks the issue?
>>
>>      With regards to offset commit issue, I'm not sure as to how to
>> proceed here. Probably I'll use your fix first and see if the problem
>> reoccurs.
>>
>> Thanks much
>> Varaga
>>
>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> @CVP
>>>
>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>> bytes) and the custom state (e).
>>>
>>> Here is an illustration of the checkpoint and what is stored (from the
>>> Flink docs).
>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>> nals/stream_checkpointing.html
>>>
>>>
>>> I am quite puzzled why the offset committing problem occurs only for one
>>> input, and not for the other.
>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>> Could you try out a snapshot version to see if that fixes your problem?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>
>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>>> localhost (ubuntu 14.04)
>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>
>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>> standalone cluster.
>>>>
>>>>
>>>>      BTW., The kafka connector version that I use is as suggested in
>>>> the flink connectors page
>>>>
>>>>
>>>>
>>>>
>>>> *.        <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>> <version>1.1.1</version>         </dependency>*
>>>>
>>>>      Do you see any issues with versions?
>>>>
>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>
>>>>      2) There isn't detailed explanation on what states are stored as
>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>>> is:*
>>>>
>>>> *         a) The source stream's custom watermarked records*
>>>>
>>>> *         b) Intermediate states of each of the transformations in the
>>>> pipeline*
>>>>
>>>> *         c) Delta of Records stored from the previous sink*
>>>>
>>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>>> this is what I bother about storing.*
>>>> *         e) All of my operators*
>>>>
>>>>       Is my understanding right?
>>>>
>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>
>>>>      4) Can you apply checkpointing to only streams and certain
>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>
>>>> Best Regards
>>>> CVP
>>>>
>>>>
>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Thanks, the logs were very helpful!
>>>>>
>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>> proper starting of checkpoints.
>>>>>
>>>>> Here is what is happening in detail:
>>>>>
>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>> (the other is fine).
>>>>>   - The only way this delayed can be introduced is if another
>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>> concurrency model for users.
>>>>>   - The operation that is still in progress must be the committing of
>>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>> happens once one side receives the first record. Before that, there is
>>>>> nothing to commit.
>>>>>
>>>>>
>>>>> What Flink should fix:
>>>>>   - The KafkaConsumer should run the commit operations asynchronously,
>>>>> to not block the "notifyCheckpointComplete()" method.
>>>>>
>>>>> What you can fix:
>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>> well, the other does not. Do they go against different sets of brokers, or
>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>> checkpoints anyways.
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>     Please find my responses below.
>>>>>>
>>>>>>     - What source are you using for the slow input?
>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>> Streams*
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>
>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>> below.*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> streamEnv.setStateBackend(new
>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>
>>>>>>
>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>> Kb... *
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>
>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>
>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>> (parallelism=1)
>>>>>>
>>>>>>     Please let me know if you need further info.,
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>> information?
>>>>>>>
>>>>>>>   - What source are you using for the slow input?
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi CVP,
>>>>>>>>
>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>
>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>
>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>
>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>
>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint takes
>>>>>>>>> 2 minutes straight away.
>>>>>>>>>
>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>
>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>>>
>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>
>>>>>>>>>  Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Helping out here: this is the PR for async Kafka offset committing - https://github.com/apache/flink/pull/2574.
It has already been merged into the master and release-1.1 branches, so you can try out the changes now if you’d like.
The change should also be included in the 1.1.3 release, which the Flink community is discussing to release soon.

Will definitely be helpful if you can provide feedback afterwards!

Best Regards,
Gordon


On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (chakravarthyvp@gmail.com) wrote:

Hi Stephan,

    Is the Async kafka offset commit released in 1.3.1?

Varaga

On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <ch...@gmail.com> wrote:
Hi Stephan,

     That should be great. Let me know once the fix is done and the snapshot version to use, I'll check and revert then.
     Can you also share the JIRA that tracks the issue?
 
     With regards to offset commit issue, I'm not sure as to how to proceed here. Probably I'll use your fix first and see if the problem reoccurs.

Thanks much
Varaga

On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
@CVP

Flink stores in checkpoints in your case only the Kafka offsets (few bytes) and the custom state (e).

Here is an illustration of the checkpoint and what is stored (from the Flink docs).
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html


I am quite puzzled why the offset committing problem occurs only for one input, and not for the other.
I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
Could you try out a snapshot version to see if that fixes your problem?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <ch...@gmail.com> wrote:
Hi Stefan,

     Thanks a million for your detailed explanation. I appreciate it.

     -  The zookeeper bundled with kafka 0.9.0.1 was used to start zookeeper. There is only 1 instance (standalone) of zookeeper running on my localhost (ubuntu 14.04)
     -  There is only 1 Kafka broker (version: 0.9.0.1 )

     With regards to Flink cluster there's only 1 JM & 2 TMs started with no HA. I presume this does not use zookeeper anyways as it runs as standalone cluster.

 
     BTW., The kafka connector version that I use is as suggested in the flink connectors page.
       <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
              <version>1.1.1</version>
        </dependency>
 
     Do you see any issues with versions?
   
     1) Do you have benchmarks wrt., to checkpointing in flink?

     2) There isn't detailed explanation on what states are stored as part of the checkpointing process. For ex.,  If I have pipeline like source -> map -> keyBy -> map -> sink, my assumption on what's stored is:
         a) The source stream's custom watermarked records
         b) Intermediate states of each of the transformations in the pipeline
         c) Delta of Records stored from the previous sink
         d) Custom States (SayValueState as in my case) - Essentially this is what I bother about storing.
         e) All of my operators

      Is my understanding right?

     3) Is there a way in Flink to checkpoint only d) as stated above

     4) Can you apply checkpointing to only streams and certain operators (say I wish to store aggregated values part of the transformation)

Best Regards
CVP


On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
Thanks, the logs were very helpful!

TL:DR - The offset committing to ZooKeeper is very slow and prevents proper starting of checkpoints.

Here is what is happening in detail:

  - Between the point when the TaskManager receives the "trigger checkpoint" message and when the point when the KafkaSource actually starts the checkpoint is a long time (many seconds) - for one of the Kafka Inputs (the other is fine).
  - The only way this delayed can be introduced is if another checkpoint related operation (such as trigger() or notifyComplete() ) is still in progress when the checkpoint is started. Flink does not perform concurrent checkpoint operations on a single operator, to ease the concurrency model for users.
  - The operation that is still in progress must be the committing of the offsets (to ZooKeeper or Kafka). That also explains why this only happens once one side receives the first record. Before that, there is nothing to commit.


What Flink should fix:
  - The KafkaConsumer should run the commit operations asynchronously, to not block the "notifyCheckpointComplete()" method.

What you can fix:
  - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well, the other does not. Do they go against different sets of brokers, or different ZooKeepers? Is the metadata for one input bad?
  - In the next Flink version, you may opt-out of committing offsets to Kafka/ZooKeeper all together. It is not important for Flink's checkpoints anyways.

Greetings,
Stephan


On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <ch...@gmail.com> wrote:
Hi Stefan,

    Please find my responses below.

    - What source are you using for the slow input?
     [CVP] - Both stream as pointed out in my first mail, are Kafka Streams
  - How large is the state that you are checkpointing?
     [CVP] - I have enabled checkpointing on the StreamEnvironment as below.
         final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         streamEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
         streamEnv.enableCheckpointing(10000);

      In terms of the state stored, the KS1 stream has payload of 100K events/second, while KS2 have about 1 event / 10 minutes... basically the operators perform flatmaps on 8 fields of tuple (all fields are primitives). If you look at the states' sizes in dashboard they are in Kb...

  - Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure?
    [CVP] -There are no back pressure atleast from the sample computation in the flink dashboard. 100K/second is low load for flink's benchmarks. I could not quite get the barriers vs snapshot state. I have attached the Task Manager log (DEBUG) info if that will interest you.
 
     I have attached the checkpoints times' as .png from the dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the checkpoints take more than a minute in each case. Before these checkpoints, the KS2 stream did not have any events. As soon as an event(should be in bytes) was generated, the checkpoints went slow and subsequently a minute more for every checkpoint thereafter.

   This log was collected from the standalone flink cluster with 1 job manager & 2 TMs. 1 TM was running this application with checkpointing (parallelism=1)
 
    Please let me know if you need further info.,
    

On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:
Hi!

Let's try to figure that one out. Can you give us a bit more information?

  - What source are you using for the slow input?
  - How large is the state that you are checkpointing?
  - Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure?

Greetings,
Stephan



On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com> wrote:
Hi CVP,

I'm not so much familiar with the internals of the checkpointing system, but maybe Stephan (in CC) has an idea what's going on here.

Best, Fabian

2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
Hi Aljoscha & Fabian,

    I have a stream application that has 2 stream source as below.

     KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
     KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v pairs).keyBy(0);

     ks1.connect(ks2).flatMap(X);
     //X is a CoFlatMapFunction that inserts and removes elements from ks2 into a key-value state member. Elements from ks1 are matched against that state. the CoFlatMapFunction operator maintains ValueState<Tuple2<Long, Long>>;

     //ks1 is streaming about 100K events/sec from kafka topic
     //ks2 is streaming about 1 event every 10 minutes... Precisely when the 1st event is consumed from this stream, checkpoint takes 2 minutes straight away.

    The version of flink is 1.1.2.

I tried to use checkpoint every 10 Secs using a FsStateBackend... What I notice is that the checkpoint duration is almost 2 minutes for many cases, while for the other cases it varies from 100 ms to 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for your reference.

     Is this an issue with flink checkpointing?

 Best Regards
CVP









Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Stephan,

    Is the Async kafka offset commit released in 1.3.1?

Varaga

On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Stephan,
>
>      That should be great. Let me know once the fix is done and the
> snapshot version to use, I'll check and revert then.
>      Can you also share the JIRA that tracks the issue?
>
>      With regards to offset commit issue, I'm not sure as to how to
> proceed here. Probably I'll use your fix first and see if the problem
> reoccurs.
>
> Thanks much
> Varaga
>
> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> @CVP
>>
>> Flink stores in checkpoints in your case only the Kafka offsets (few
>> bytes) and the custom state (e).
>>
>> Here is an illustration of the checkpoint and what is stored (from the
>> Flink docs).
>> https://ci.apache.org/projects/flink/flink-docs-master/
>> internals/stream_checkpointing.html
>>
>>
>> I am quite puzzled why the offset committing problem occurs only for one
>> input, and not for the other.
>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>> Could you try out a snapshot version to see if that fixes your problem?
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Stefan,
>>>
>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>
>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>> localhost (ubuntu 14.04)
>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>
>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>> standalone cluster.
>>>
>>>
>>>      BTW., The kafka connector version that I use is as suggested in the
>>> flink connectors page
>>>
>>>
>>>
>>>
>>> *.       <dependency>              <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>         <version>1.1.1</version>        </dependency>*
>>>
>>>      Do you see any issues with versions?
>>>
>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>
>>>      2) There isn't detailed explanation on what states are stored as
>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>> is:*
>>>
>>> *         a) The source stream's custom watermarked records*
>>>
>>> *         b) Intermediate states of each of the transformations in the
>>> pipeline*
>>>
>>> *         c) Delta of Records stored from the previous sink*
>>>
>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>> this is what I bother about storing.*
>>> *         e) All of my operators*
>>>
>>>       Is my understanding right?
>>>
>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>
>>>      4) Can you apply checkpointing to only streams and certain
>>> operators (say I wish to store aggregated values part of the transformation)
>>>
>>> Best Regards
>>> CVP
>>>
>>>
>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Thanks, the logs were very helpful!
>>>>
>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>> proper starting of checkpoints.
>>>>
>>>> Here is what is happening in detail:
>>>>
>>>>   - Between the point when the TaskManager receives the "trigger
>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>> (the other is fine).
>>>>   - The only way this delayed can be introduced is if another
>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>> still in progress when the checkpoint is started. Flink does not perform
>>>> concurrent checkpoint operations on a single operator, to ease the
>>>> concurrency model for users.
>>>>   - The operation that is still in progress must be the committing of
>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>> happens once one side receives the first record. Before that, there is
>>>> nothing to commit.
>>>>
>>>>
>>>> What Flink should fix:
>>>>   - The KafkaConsumer should run the commit operations asynchronously,
>>>> to not block the "notifyCheckpointComplete()" method.
>>>>
>>>> What you can fix:
>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>> well, the other does not. Do they go against different sets of brokers, or
>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>   - In the next Flink version, you may opt-out of committing offsets to
>>>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>>>> anyways.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>>     Please find my responses below.
>>>>>
>>>>>     - What source are you using for the slow input?
>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>> Streams*
>>>>>   - How large is the state that you are checkpointing?
>>>>>
>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>> below.*
>>>>>
>>>>>
>>>>>
>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> streamEnv.setStateBackend(new
>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>
>>>>>
>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>> Kb...*
>>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>>> travel through the stream due to a lot of backpressure?
>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>
>>>>>      I have attached the checkpoints times' as .png from the
>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>> see that the checkpoints take more than a minute in each case. Before these
>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>
>>>>>    This log was collected from the standalone flink cluster with 1 job
>>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>> (parallelism=1)
>>>>>
>>>>>     Please let me know if you need further info.,
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>> information?
>>>>>>
>>>>>>   - What source are you using for the slow input?
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint barriers to
>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi CVP,
>>>>>>>
>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>
>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>
>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>
>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>
>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>
>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>>>>> straight away.
>>>>>>>>
>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>
>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>>
>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>
>>>>>>>>  Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Stephan,

     That should be great. Let me know once the fix is done and the
snapshot version to use, I'll check and revert then.
     Can you also share the JIRA that tracks the issue?

     With regards to offset commit issue, I'm not sure as to how to proceed
here. Probably I'll use your fix first and see if the problem reoccurs.

Thanks much
Varaga

On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:

> @CVP
>
> Flink stores in checkpoints in your case only the Kafka offsets (few
> bytes) and the custom state (e).
>
> Here is an illustration of the checkpoint and what is stored (from the
> Flink docs).
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_
> checkpointing.html
>
>
> I am quite puzzled why the offset committing problem occurs only for one
> input, and not for the other.
> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
> Could you try out a snapshot version to see if that fixes your problem?
>
> Greetings,
> Stephan
>
>
>
> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Stefan,
>>
>>      Thanks a million for your detailed explanation. I appreciate it.
>>
>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>> localhost (ubuntu 14.04)
>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>
>>      With regards to Flink cluster there's only 1 JM & 2 TMs started with
>> no HA. I presume this does not use zookeeper anyways as it runs as
>> standalone cluster.
>>
>>
>>      BTW., The kafka connector version that I use is as suggested in the
>> flink connectors page
>>
>>
>>
>>
>> *.       <dependency>              <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>         <version>1.1.1</version>        </dependency>*
>>
>>      Do you see any issues with versions?
>>
>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>
>>      2) There isn't detailed explanation on what states are stored as
>> part of the checkpointing process. For ex.,  If I have pipeline like
>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>> is:*
>>
>> *         a) The source stream's custom watermarked records*
>>
>> *         b) Intermediate states of each of the transformations in the
>> pipeline*
>>
>> *         c) Delta of Records stored from the previous sink*
>>
>> *         d) Custom States (SayValueState as in my case) - Essentially
>> this is what I bother about storing.*
>> *         e) All of my operators*
>>
>>       Is my understanding right?
>>
>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>
>>      4) Can you apply checkpointing to only streams and certain operators
>> (say I wish to store aggregated values part of the transformation)
>>
>> Best Regards
>> CVP
>>
>>
>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Thanks, the logs were very helpful!
>>>
>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>> proper starting of checkpoints.
>>>
>>> Here is what is happening in detail:
>>>
>>>   - Between the point when the TaskManager receives the "trigger
>>> checkpoint" message and when the point when the KafkaSource actually starts
>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>> (the other is fine).
>>>   - The only way this delayed can be introduced is if another checkpoint
>>> related operation (such as trigger() or notifyComplete() ) is still in
>>> progress when the checkpoint is started. Flink does not perform concurrent
>>> checkpoint operations on a single operator, to ease the concurrency model
>>> for users.
>>>   - The operation that is still in progress must be the committing of
>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>> happens once one side receives the first record. Before that, there is
>>> nothing to commit.
>>>
>>>
>>> What Flink should fix:
>>>   - The KafkaConsumer should run the commit operations asynchronously,
>>> to not block the "notifyCheckpointComplete()" method.
>>>
>>> What you can fix:
>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>> well, the other does not. Do they go against different sets of brokers, or
>>> different ZooKeepers? Is the metadata for one input bad?
>>>   - In the next Flink version, you may opt-out of committing offsets to
>>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>>> anyways.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>>     Please find my responses below.
>>>>
>>>>     - What source are you using for the slow input?
>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>> Streams*
>>>>   - How large is the state that you are checkpointing?
>>>>
>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>> below.*
>>>>
>>>>
>>>>
>>>> *         final StreamExecutionEnvironment streamEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> streamEnv.setStateBackend(new
>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>> streamEnv.enableCheckpointing(10000);*
>>>>
>>>>
>>>> *      In terms of the state stored, the KS1 stream has payload of 100K
>>>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>>>> operators perform flatmaps on 8 fields of tuple (all fields are
>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>> Kb...*
>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>> travel through the stream due to a lot of backpressure?
>>>>     [CVP] -There are no back pressure atleast from the sample
>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>
>>>>      I have attached the checkpoints times' as .png from the dashboard.
>>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that
>>>> the checkpoints take more than a minute in each case. Before these
>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>> subsequently a minute more for every checkpoint thereafter.
>>>>
>>>>    This log was collected from the standalone flink cluster with 1 job
>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>> (parallelism=1)
>>>>
>>>>     Please let me know if you need further info.,
>>>>
>>>>
>>>>
>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>> information?
>>>>>
>>>>>   - What source are you using for the slow input?
>>>>>   - How large is the state that you are checkpointing?
>>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>>>> through the stream due to a lot of backpressure?
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi CVP,
>>>>>>
>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com>:
>>>>>>
>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>
>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>
>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>
>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>
>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>>>> straight away.
>>>>>>>
>>>>>>>     The version of flink is 1.1.2.
>>>>>>>
>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>>
>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>
>>>>>>>  Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
@CVP

Flink stores in checkpoints in your case only the Kafka offsets (few bytes)
and the custom state (e).

Here is an illustration of the checkpoint and what is stored (from the
Flink docs).
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html


I am quite puzzled why the offset committing problem occurs only for one
input, and not for the other.
I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
Could you try out a snapshot version to see if that fixes your problem?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Stefan,
>
>      Thanks a million for your detailed explanation. I appreciate it.
>
>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
> localhost (ubuntu 14.04)
>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>
>      With regards to Flink cluster there's only 1 JM & 2 TMs started with
> no HA. I presume this does not use zookeeper anyways as it runs as
> standalone cluster.
>
>
>      BTW., The kafka connector version that I use is as suggested in the
> flink connectors page
>
>
>
>
> *.       <dependency>              <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>         <version>1.1.1</version>        </dependency>*
>
>      Do you see any issues with versions?
>
>      1) Do you have benchmarks wrt., to checkpointing in flink?
>
>      2) There isn't detailed explanation on what states are stored as part
> of the checkpointing process. For ex.,  If I have pipeline like
> *source -> map -> keyBy -> map -> sink, my assumption on what's stored is:*
>
> *         a) The source stream's custom watermarked records*
>
> *         b) Intermediate states of each of the transformations in the
> pipeline*
>
> *         c) Delta of Records stored from the previous sink*
>
> *         d) Custom States (SayValueState as in my case) - Essentially
> this is what I bother about storing.*
> *         e) All of my operators*
>
>       Is my understanding right?
>
>      3) Is there a way in Flink to checkpoint only d) as stated above
>
>      4) Can you apply checkpointing to only streams and certain operators
> (say I wish to store aggregated values part of the transformation)
>
> Best Regards
> CVP
>
>
> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Thanks, the logs were very helpful!
>>
>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>> proper starting of checkpoints.
>>
>> Here is what is happening in detail:
>>
>>   - Between the point when the TaskManager receives the "trigger
>> checkpoint" message and when the point when the KafkaSource actually starts
>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>> (the other is fine).
>>   - The only way this delayed can be introduced is if another checkpoint
>> related operation (such as trigger() or notifyComplete() ) is still in
>> progress when the checkpoint is started. Flink does not perform concurrent
>> checkpoint operations on a single operator, to ease the concurrency model
>> for users.
>>   - The operation that is still in progress must be the committing of the
>> offsets (to ZooKeeper or Kafka). That also explains why this only happens
>> once one side receives the first record. Before that, there is nothing to
>> commit.
>>
>>
>> What Flink should fix:
>>   - The KafkaConsumer should run the commit operations asynchronously, to
>> not block the "notifyCheckpointComplete()" method.
>>
>> What you can fix:
>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>> well, the other does not. Do they go against different sets of brokers, or
>> different ZooKeepers? Is the metadata for one input bad?
>>   - In the next Flink version, you may opt-out of committing offsets to
>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>> anyways.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Stefan,
>>>
>>>     Please find my responses below.
>>>
>>>     - What source are you using for the slow input?
>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>> Streams*
>>>   - How large is the state that you are checkpointing?
>>>
>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>>>
>>>
>>>
>>> *         final StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> streamEnv.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>> streamEnv.enableCheckpointing(10000);*
>>>
>>>
>>> *      In terms of the state stored, the KS1 stream has payload of 100K
>>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>>> operators perform flatmaps on 8 fields of tuple (all fields are
>>> primitives). If you look at the states' sizes in dashboard they are in
>>> Kb...*
>>>   - Can you try to see in the log if actually the state snapshot takes
>>> that long, or if it simply takes long for the checkpoint barriers to
>>> travel through the stream due to a lot of backpressure?
>>>     [CVP] -There are no back pressure atleast from the sample
>>> computation in the flink dashboard. 100K/second is low load for flink's
>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>
>>>      I have attached the checkpoints times' as .png from the dashboard.
>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
>>> checkpoints take more than a minute in each case. Before these checkpoints,
>>> the KS2 stream did not have any events. As soon as an event(should be in
>>> bytes) was generated, the checkpoints went slow and subsequently a minute
>>> more for every checkpoint thereafter.
>>>
>>>    This log was collected from the standalone flink cluster with 1 job
>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>> (parallelism=1)
>>>
>>>     Please let me know if you need further info.,
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Let's try to figure that one out. Can you give us a bit more
>>>> information?
>>>>
>>>>   - What source are you using for the slow input?
>>>>   - How large is the state that you are checkpointing?
>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>>> through the stream due to a lot of backpressure?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi CVP,
>>>>>
>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com>:
>>>>>
>>>>>> Hi Aljoscha & Fabian,
>>>>>>
>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>
>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>
>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>
>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>>> straight away.
>>>>>>
>>>>>>     The version of flink is 1.1.2.
>>>>>>
>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for many
>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>>
>>>>>>      Is this an issue with flink checkpointing?
>>>>>>
>>>>>>  Best Regards
>>>>>> CVP
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Stefan,

     Thanks a million for your detailed explanation. I appreciate it.

     -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
zookeeper. There is only 1 instance (standalone) of zookeeper running on my
localhost (ubuntu 14.04)
     -  There is only 1 Kafka broker (*version: 0.9.0.1* )

     With regards to Flink cluster there's only 1 JM & 2 TMs started with
no HA. I presume this does not use zookeeper anyways as it runs as
standalone cluster.


     BTW., The kafka connector version that I use is as suggested in the
flink connectors page




*.       <dependency>              <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
    <version>1.1.1</version>        </dependency>*

     Do you see any issues with versions?

     1) Do you have benchmarks wrt., to checkpointing in flink?

     2) There isn't detailed explanation on what states are stored as part
of the checkpointing process. For ex.,  If I have pipeline like
*source -> map -> keyBy -> map -> sink, my assumption on what's stored is:*

*         a) The source stream's custom watermarked records*

*         b) Intermediate states of each of the transformations in the
pipeline*

*         c) Delta of Records stored from the previous sink*

*         d) Custom States (SayValueState as in my case) - Essentially this
is what I bother about storing.*
*         e) All of my operators*

      Is my understanding right?

     3) Is there a way in Flink to checkpoint only d) as stated above

     4) Can you apply checkpointing to only streams and certain operators
(say I wish to store aggregated values part of the transformation)

Best Regards
CVP


On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:

> Thanks, the logs were very helpful!
>
> TL:DR - The offset committing to ZooKeeper is very slow and prevents
> proper starting of checkpoints.
>
> Here is what is happening in detail:
>
>   - Between the point when the TaskManager receives the "trigger
> checkpoint" message and when the point when the KafkaSource actually starts
> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
> (the other is fine).
>   - The only way this delayed can be introduced is if another checkpoint
> related operation (such as trigger() or notifyComplete() ) is still in
> progress when the checkpoint is started. Flink does not perform concurrent
> checkpoint operations on a single operator, to ease the concurrency model
> for users.
>   - The operation that is still in progress must be the committing of the
> offsets (to ZooKeeper or Kafka). That also explains why this only happens
> once one side receives the first record. Before that, there is nothing to
> commit.
>
>
> What Flink should fix:
>   - The KafkaConsumer should run the commit operations asynchronously, to
> not block the "notifyCheckpointComplete()" method.
>
> What you can fix:
>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well,
> the other does not. Do they go against different sets of brokers, or
> different ZooKeepers? Is the metadata for one input bad?
>   - In the next Flink version, you may opt-out of committing offsets to
> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
> anyways.
>
> Greetings,
> Stephan
>
>
> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Stefan,
>>
>>     Please find my responses below.
>>
>>     - What source are you using for the slow input?
>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>> Streams*
>>   - How large is the state that you are checkpointing?
>>
>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>>
>>
>>
>> *         final StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> streamEnv.setStateBackend(new
>> FsStateBackend("file:///tmp/flink/checkpoints"));
>> streamEnv.enableCheckpointing(10000);*
>>
>>
>> *      In terms of the state stored, the KS1 stream has payload of 100K
>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>> operators perform flatmaps on 8 fields of tuple (all fields are
>> primitives). If you look at the states' sizes in dashboard they are in
>> Kb...*
>>   - Can you try to see in the log if actually the state snapshot takes
>> that long, or if it simply takes long for the checkpoint barriers to
>> travel through the stream due to a lot of backpressure?
>>     [CVP] -There are no back pressure atleast from the sample
>> computation in the flink dashboard. 100K/second is low load for flink's
>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>> attached the Task Manager log (DEBUG) info if that will interest you.
>>
>>      I have attached the checkpoints times' as .png from the dashboard.
>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
>> checkpoints take more than a minute in each case. Before these checkpoints,
>> the KS2 stream did not have any events. As soon as an event(should be in
>> bytes) was generated, the checkpoints went slow and subsequently a minute
>> more for every checkpoint thereafter.
>>
>>    This log was collected from the standalone flink cluster with 1 job
>> manager & 2 TMs. 1 TM was running this application with checkpointing
>> (parallelism=1)
>>
>>     Please let me know if you need further info.,
>>
>>
>>
>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> Let's try to figure that one out. Can you give us a bit more information?
>>>
>>>   - What source are you using for the slow input?
>>>   - How large is the state that you are checkpointing?
>>>   - Can you try to see in the log if actually the state snapshot takes
>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>> through the stream due to a lot of backpressure?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi CVP,
>>>>
>>>> I'm not so much familiar with the internals of the checkpointing
>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> Hi Aljoscha & Fabian,
>>>>>
>>>>>     I have a stream application that has 2 stream source as below.
>>>>>
>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split
>>>>> T into k-v pairs).keyBy(0);
>>>>>
>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>      //X is a CoFlatMapFunction that inserts and removes elements from
>>>>> ks2 into a key-value state member. Elements from ks1 are matched against
>>>>> that state. the CoFlatMapFunction operator maintains
>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>
>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>>> straight away.
>>>>>
>>>>>     The version of flink is 1.1.2.
>>>>>
>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What
>>>>> I notice is that the checkpoint duration is almost 2 minutes for many
>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>>
>>>>>      Is this an issue with flink checkpointing?
>>>>>
>>>>>  Best Regards
>>>>> CVP
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
Thanks, the logs were very helpful!

TL:DR - The offset committing to ZooKeeper is very slow and prevents proper
starting of checkpoints.

Here is what is happening in detail:

  - Between the point when the TaskManager receives the "trigger
checkpoint" message and when the point when the KafkaSource actually starts
the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
(the other is fine).
  - The only way this delayed can be introduced is if another checkpoint
related operation (such as trigger() or notifyComplete() ) is still in
progress when the checkpoint is started. Flink does not perform concurrent
checkpoint operations on a single operator, to ease the concurrency model
for users.
  - The operation that is still in progress must be the committing of the
offsets (to ZooKeeper or Kafka). That also explains why this only happens
once one side receives the first record. Before that, there is nothing to
commit.


What Flink should fix:
  - The KafkaConsumer should run the commit operations asynchronously, to
not block the "notifyCheckpointComplete()" method.

What you can fix:
  - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well,
the other does not. Do they go against different sets of brokers, or
different ZooKeepers? Is the metadata for one input bad?
  - In the next Flink version, you may opt-out of committing offsets to
Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
anyways.

Greetings,
Stephan


On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Stefan,
>
>     Please find my responses below.
>
>     - What source are you using for the slow input?
> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
> Streams*
>   - How large is the state that you are checkpointing?
>
> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>
>
>
> *         final StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/checkpoints"));
> streamEnv.enableCheckpointing(10000);*
>
>
> *      In terms of the state stored, the KS1 stream has payload of 100K
> events/second, while KS2 have about 1 event / 10 minutes... basically the
> operators perform flatmaps on 8 fields of tuple (all fields are
> primitives). If you look at the states' sizes in dashboard they are in
> Kb...*
>   - Can you try to see in the log if actually the state snapshot takes
> that long, or if it simply takes long for the checkpoint barriers to
> travel through the stream due to a lot of backpressure?
>     [CVP] -There are no back pressure atleast from the sample computation
> in the flink dashboard. 100K/second is low load for flink's benchmarks. I
> could not quite get the barriers vs snapshot state. I have attached the
> Task Manager log (DEBUG) info if that will interest you.
>
>      I have attached the checkpoints times' as .png from the dashboard.
> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
> checkpoints take more than a minute in each case. Before these checkpoints,
> the KS2 stream did not have any events. As soon as an event(should be in
> bytes) was generated, the checkpoints went slow and subsequently a minute
> more for every checkpoint thereafter.
>
>    This log was collected from the standalone flink cluster with 1 job
> manager & 2 TMs. 1 TM was running this application with checkpointing
> (parallelism=1)
>
>     Please let me know if you need further info.,
>
>
>
> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Let's try to figure that one out. Can you give us a bit more information?
>>
>>   - What source are you using for the slow input?
>>   - How large is the state that you are checkpointing?
>>   - Can you try to see in the log if actually the state snapshot takes
>> that long, or if it simply takes long for the checkpoint barriers to travel
>> through the stream due to a lot of backpressure?
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi CVP,
>>>
>>> I'm not so much familiar with the internals of the checkpointing system,
>>> but maybe Stephan (in CC) has an idea what's going on here.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com
>>> >:
>>>
>>>> Hi Aljoscha & Fabian,
>>>>
>>>>     I have a stream application that has 2 stream source as below.
>>>>
>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split
>>>> T into k-v pairs).keyBy(0);
>>>>
>>>>      ks1.connect(ks2).flatMap(X);
>>>>      //X is a CoFlatMapFunction that inserts and removes elements from
>>>> ks2 into a key-value state member. Elements from ks1 are matched against
>>>> that state. the CoFlatMapFunction operator maintains
>>>> ValueState<Tuple2<Long, Long>>;
>>>>
>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>>> straight away.
>>>>
>>>>     The version of flink is 1.1.2.
>>>>
>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What
>>>> I notice is that the checkpoint duration is almost 2 minutes for many
>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>> frequently. I'm attaching the snapshot of the dashboard for your reference.
>>>>
>>>>      Is this an issue with flink checkpointing?
>>>>
>>>>  Best Regards
>>>> CVP
>>>>
>>>
>>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Stefan,

    Please find my responses below.

    - What source are you using for the slow input?
*     [CVP] - Both stream as pointed out in my first mail, are Kafka
Streams*
  - How large is the state that you are checkpointing?

*[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*



*         final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/checkpoints"));
streamEnv.enableCheckpointing(10000);*


*      In terms of the state stored, the KS1 stream has payload of 100K
events/second, while KS2 have about 1 event / 10 minutes... basically the
operators perform flatmaps on 8 fields of tuple (all fields are
primitives). If you look at the states' sizes in dashboard they are in
Kb...*
  - Can you try to see in the log if actually the state snapshot takes that
long, or if it simply takes long for the checkpoint barriers to travel
through the stream due to a lot of backpressure?
    [CVP] -There are no back pressure atleast from the sample computation
in the flink dashboard. 100K/second is low load for flink's benchmarks. I
could not quite get the barriers vs snapshot state. I have attached the
Task Manager log (DEBUG) info if that will interest you.

     I have attached the checkpoints times' as .png from the dashboard.
Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
checkpoints take more than a minute in each case. Before these checkpoints,
the KS2 stream did not have any events. As soon as an event(should be in
bytes) was generated, the checkpoints went slow and subsequently a minute
more for every checkpoint thereafter.

   This log was collected from the standalone flink cluster with 1 job
manager & 2 TMs. 1 TM was running this application with checkpointing
(parallelism=1)

    Please let me know if you need further info.,



On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Let's try to figure that one out. Can you give us a bit more information?
>
>   - What source are you using for the slow input?
>   - How large is the state that you are checkpointing?
>   - Can you try to see in the log if actually the state snapshot takes
> that long, or if it simply takes long for the checkpoint barriers to travel
> through the stream due to a lot of backpressure?
>
> Greetings,
> Stephan
>
>
>
> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi CVP,
>>
>> I'm not so much familiar with the internals of the checkpointing system,
>> but maybe Stephan (in CC) has an idea what's going on here.
>>
>> Best, Fabian
>>
>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> Hi Aljoscha & Fabian,
>>>
>>>     I have a stream application that has 2 stream source as below.
>>>
>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split T
>>> into k-v pairs).keyBy(0);
>>>
>>>      ks1.connect(ks2).flatMap(X);
>>>      //X is a CoFlatMapFunction that inserts and removes elements from
>>> ks2 into a key-value state member. Elements from ks1 are matched against
>>> that state. the CoFlatMapFunction operator maintains
>>> ValueState<Tuple2<Long, Long>>;
>>>
>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
>>> the 1st event is consumed from this stream, checkpoint takes 2 minutes
>>> straight away.
>>>
>>>     The version of flink is 1.1.2.
>>>
>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What I
>>> notice is that the checkpoint duration is almost 2 minutes for many cases,
>>> while for the other cases it varies from 100 ms to 1.5 minutes frequently.
>>> I'm attaching the snapshot of the dashboard for your reference.
>>>
>>>      Is this an issue with flink checkpointing?
>>>
>>>  Best Regards
>>> CVP
>>>
>>
>>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Let's try to figure that one out. Can you give us a bit more information?

  - What source are you using for the slow input?
  - How large is the state that you are checkpointing?
  - Can you try to see in the log if actually the state snapshot takes that
long, or if it simply takes long for the checkpoint barriers to travel
through the stream due to a lot of backpressure?

Greetings,
Stephan



On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi CVP,
>
> I'm not so much familiar with the internals of the checkpointing system,
> but maybe Stephan (in CC) has an idea what's going on here.
>
> Best, Fabian
>
> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> Hi Aljoscha & Fabian,
>>
>>     I have a stream application that has 2 stream source as below.
>>
>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split T
>> into k-v pairs).keyBy(0);
>>
>>      ks1.connect(ks2).flatMap(X);
>>      //X is a CoFlatMapFunction that inserts and removes elements from
>> ks2 into a key-value state member. Elements from ks1 are matched against
>> that state. the CoFlatMapFunction operator maintains
>> ValueState<Tuple2<Long, Long>>;
>>
>>      //ks1 is streaming about 100K events/sec from kafka topic
>>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
>> the 1st event is consumed from this stream, checkpoint takes 2 minutes
>> straight away.
>>
>>     The version of flink is 1.1.2.
>>
>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What I
>> notice is that the checkpoint duration is almost 2 minutes for many cases,
>> while for the other cases it varies from 100 ms to 1.5 minutes frequently.
>> I'm attaching the snapshot of the dashboard for your reference.
>>
>>      Is this an issue with flink checkpointing?
>>
>>  Best Regards
>> CVP
>>
>
>

Re: Flink Checkpoint runs slow for low load stream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi CVP,

I'm not so much familiar with the internals of the checkpointing system,
but maybe Stephan (in CC) has an idea what's going on here.

Best, Fabian

2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:

> Hi Aljoscha & Fabian,
>
>     I have a stream application that has 2 stream source as below.
>
>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>      KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split T
> into k-v pairs).keyBy(0);
>
>      ks1.connect(ks2).flatMap(X);
>      //X is a CoFlatMapFunction that inserts and removes elements from ks2
> into a key-value state member. Elements from ks1 are matched against that
> state. the CoFlatMapFunction operator maintains ValueState<Tuple2<Long,
> Long>>;
>
>      //ks1 is streaming about 100K events/sec from kafka topic
>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
> the 1st event is consumed from this stream, checkpoint takes 2 minutes
> straight away.
>
>     The version of flink is 1.1.2.
>
> I tried to use checkpoint every 10 Secs using a FsStateBackend... What I
> notice is that the checkpoint duration is almost 2 minutes for many cases,
> while for the other cases it varies from 100 ms to 1.5 minutes frequently.
> I'm attaching the snapshot of the dashboard for your reference.
>
>      Is this an issue with flink checkpointing?
>
>  Best Regards
> CVP
>