You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuta Morisawa <yu...@kddi-research.jp> on 2017/09/12 09:02:15 UTC

Streaming API has a long delay at the beginning of the process.

Hi,

I am worrying about the delay of the Streaming API.
My application is that it gets data from kafka-connectors and process 
them, then push data to kafka-producers.
The problem is that the app suffers a long delay when the first data 
come in the cluster.
It takes about 1000ms to process data (I measure the time with 
kafka-timestamp). On the other hand, it works well after 2-3 seconds 
first data come in (the delay is about 200ms).

The application is so delay sensitive that I want to solve this problem.
Now, I think this is a matter of JVM but I have no idea to investigate it.
Is there any way to avoid this delay?



Thank you for your attention
Yuta

Re: Streaming API has a long delay at the beginning of the process.

Posted by Yuta Morisawa <yu...@kddi-research.jp>.
Hi Fabian,

Thanks a lot.
I got a better understanding.

 > Operators are never GC'd (unless a job was cancelled)
That's great information.
Maybe, this is related to so called Managed Memory.
The document will be better if detail documents about Memory Management 
exists.

Thank you,
Yuta

On 2017/09/18 18:03, Fabian Hueske wrote:
> Hi Yuta,
> 
> you got most things right :-)
> 
> 3) sources (such as Kafka connectors) are also considered operators and 
> start immediately because they are sources.
> 4) All other operators start when they need to process their first 
> record. Operators are never GC'd (unless a job was cancelled), so the 
> setup cost is a one time thing that only happens when the job is started.
> 
> Best, Fabian
> 
> 2017-09-15 12:43 GMT+02:00 Yuta Morisawa <yu-morisawa@kddi-research.jp 
> <ma...@kddi-research.jp>>:
> 
>     Hi Fabian,
> 
>     Thank you for your description.
> 
>     This is my understanding.
>     1, At the exact time execute() method called, Flink creates
>     JobGraph, submit it to JobManager, deploy tasks to TaskManagers and
>     DOES NOT execute each operators.
>     2, Operators are executed when they needed.
>     3, Sources(kafka-connectors) starts before operators.
>     4, The first time operators are called or after GC removes
>     operators' instance, a kind of initialization occurs, such as
>     classloading, instantiation, memory allocation and so on. It may
>     costs much time.
> 
>     If there is any misunderstanding, please comment it.
>     If not, my question is solved.
> 
>     Regards.
>     Yuta
> 
>     On 2017/09/15 17:05, Fabian Hueske wrote:
> 
>         Hi Yuta,
> 
>         when the execute() method is called, the a so-called JobGraph is
>         constructed from all operators that have been added before by
>         calling map(), keyBy() and so on.
>         The JobGraph is then submitted to the JobManager which is the
>         master process in Flink. Based on the JobGraph, the master
>         deploys tasks to the worker processes (TaskManagers).
>         These are the tasks that do the actual processing and they are
>         subsequently started as I explained before, i.e., the source
>         task starts consuming from Kafka before subsequent tasks have
>         been started.
> 
>         So, there is quite a lot happening when you call execute()
>         including network communication and task deployment.
> 
>         Hope this helps,
>         Fabian
> 
>         2017-09-15 4:25 GMT+02:00 Yuta Morisawa
>         <yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>
>         <mailto:yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>>>:
> 
> 
>              Hi, Fabian
> 
>              > If I understand you correctly, the problem is only for
>         the first events
>              > that are processed.
>              Yes. More Precisely, first 300 kafka-messages.
> 
>              > AFAIK, Flink lazily instantiates its operators which
>         means that a source
>              > task starts to consume records from Kafka before the
>         subsequent tasks
>              > have been started.
>              That's a great indication. It describe well the affair.
>              But, according to the document, it says "The operations are
>         actually
>              executed when the execution is explicitly triggered by an
>         execute()
>              call on the execution environment.".
>              What does it mean?
>              AFAIK, common Flink programs invoke execute() in main().
>              Every operators start at this time? I think maybe no.
> 
>              - Flink Document
> 
>         https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>
>             
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>>
> 
> 
>              > Not sure if or what can be done about this behavior.
>              > I'll loop in Till who knows more about the lifecycle of
>         tasks.
>              Thank you very much for your kindness.
> 
>              Regards, Yuta
> 
>              On 2017/09/14 19:32, Fabian Hueske wrote:
> 
>                  Hi,
> 
>                  If I understand you correctly, the problem is only for
>         the first
>                  events that are processed.
> 
>                  AFAIK, Flink lazily instantiates its operators which
>         means that
>                  a source task starts to consume records from Kafka
>         before the
>                  subsequent tasks have been started.
>                  That's why the latency of the first records is higher.
> 
>                  Not sure if or what can be done about this behavior.
>                  I'll loop in Till who knows more about the lifecycle of
>         tasks.
> 
>                  Best, Fabian
> 
> 
>                  2017-09-12 11:02 GMT+02:00 Yuta Morisawa
>                  <yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>
>                  <mailto:yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>>
>                  <mailto:yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>
>                  <mailto:yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>>>>:
> 
>                       Hi,
> 
>                       I am worrying about the delay of the Streaming API.
>                       My application is that it gets data from
>         kafka-connectors and
>                       process them, then push data to kafka-producers.
>                       The problem is that the app suffers a long delay
>         when the
>                  first data
>                       come in the cluster.
>                       It takes about 1000ms to process data (I measure
>         the time with
>                       kafka-timestamp). On the other hand, it works well
>         after
>                  2-3 seconds
>                       first data come in (the delay is about 200ms).
> 
>                       The application is so delay sensitive that I want
>         to solve
>                  this problem.
>                       Now, I think this is a matter of JVM but I have no
>         idea to
>                       investigate it.
>                       Is there any way to avoid this delay?
> 
> 
> 
>                       Thank you for your attention
>                       Yuta
> 
> 
> 
> 

Re: Streaming API has a long delay at the beginning of the process.

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yuta,

you got most things right :-)

3) sources (such as Kafka connectors) are also considered operators and
start immediately because they are sources.
4) All other operators start when they need to process their first record.
Operators are never GC'd (unless a job was cancelled), so the setup cost is
a one time thing that only happens when the job is started.

Best, Fabian

2017-09-15 12:43 GMT+02:00 Yuta Morisawa <yu...@kddi-research.jp>:

> Hi Fabian,
>
> Thank you for your description.
>
> This is my understanding.
> 1, At the exact time execute() method called, Flink creates JobGraph,
> submit it to JobManager, deploy tasks to TaskManagers and DOES NOT execute
> each operators.
> 2, Operators are executed when they needed.
> 3, Sources(kafka-connectors) starts before operators.
> 4, The first time operators are called or after GC removes operators'
> instance, a kind of initialization occurs, such as classloading,
> instantiation, memory allocation and so on. It may costs much time.
>
> If there is any misunderstanding, please comment it.
> If not, my question is solved.
>
> Regards.
> Yuta
>
> On 2017/09/15 17:05, Fabian Hueske wrote:
>
>> Hi Yuta,
>>
>> when the execute() method is called, the a so-called JobGraph is
>> constructed from all operators that have been added before by calling
>> map(), keyBy() and so on.
>> The JobGraph is then submitted to the JobManager which is the master
>> process in Flink. Based on the JobGraph, the master deploys tasks to the
>> worker processes (TaskManagers).
>> These are the tasks that do the actual processing and they are
>> subsequently started as I explained before, i.e., the source task starts
>> consuming from Kafka before subsequent tasks have been started.
>>
>> So, there is quite a lot happening when you call execute() including
>> network communication and task deployment.
>>
>> Hope this helps,
>> Fabian
>>
>> 2017-09-15 4:25 GMT+02:00 Yuta Morisawa <yu-morisawa@kddi-research.jp
>> <ma...@kddi-research.jp>>:
>>
>>
>>     Hi, Fabian
>>
>>     > If I understand you correctly, the problem is only for the first
>> events
>>     > that are processed.
>>     Yes. More Precisely, first 300 kafka-messages.
>>
>>     > AFAIK, Flink lazily instantiates its operators which means that a
>> source
>>     > task starts to consume records from Kafka before the subsequent
>> tasks
>>     > have been started.
>>     That's a great indication. It describe well the affair.
>>     But, according to the document, it says "The operations are actually
>>     executed when the execution is explicitly triggered by an execute()
>>     call on the execution environment.".
>>     What does it mean?
>>     AFAIK, common Flink programs invoke execute() in main().
>>     Every operators start at this time? I think maybe no.
>>
>>     - Flink Document
>>
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/api_concepts.html#lazy-evaluation
>>     <https://ci.apache.org/projects/flink/flink-docs-release-1.3
>> /dev/api_concepts.html#lazy-evaluation>
>>
>>
>>     > Not sure if or what can be done about this behavior.
>>     > I'll loop in Till who knows more about the lifecycle of tasks.
>>     Thank you very much for your kindness.
>>
>>     Regards, Yuta
>>
>>     On 2017/09/14 19:32, Fabian Hueske wrote:
>>
>>         Hi,
>>
>>         If I understand you correctly, the problem is only for the first
>>         events that are processed.
>>
>>         AFAIK, Flink lazily instantiates its operators which means that
>>         a source task starts to consume records from Kafka before the
>>         subsequent tasks have been started.
>>         That's why the latency of the first records is higher.
>>
>>         Not sure if or what can be done about this behavior.
>>         I'll loop in Till who knows more about the lifecycle of tasks.
>>
>>         Best, Fabian
>>
>>
>>         2017-09-12 11:02 GMT+02:00 Yuta Morisawa
>>         <yu-morisawa@kddi-research.jp
>>         <ma...@kddi-research.jp>
>>         <mailto:yu-morisawa@kddi-research.jp
>>         <ma...@kddi-research.jp>>>:
>>
>>              Hi,
>>
>>              I am worrying about the delay of the Streaming API.
>>              My application is that it gets data from kafka-connectors and
>>              process them, then push data to kafka-producers.
>>              The problem is that the app suffers a long delay when the
>>         first data
>>              come in the cluster.
>>              It takes about 1000ms to process data (I measure the time
>> with
>>              kafka-timestamp). On the other hand, it works well after
>>         2-3 seconds
>>              first data come in (the delay is about 200ms).
>>
>>              The application is so delay sensitive that I want to solve
>>         this problem.
>>              Now, I think this is a matter of JVM but I have no idea to
>>              investigate it.
>>              Is there any way to avoid this delay?
>>
>>
>>
>>              Thank you for your attention
>>              Yuta
>>
>>
>>
>>

Re: Streaming API has a long delay at the beginning of the process.

Posted by Yuta Morisawa <yu...@kddi-research.jp>.
Hi Fabian,

Thank you for your description.

This is my understanding.
1, At the exact time execute() method called, Flink creates JobGraph, 
submit it to JobManager, deploy tasks to TaskManagers and DOES NOT 
execute each operators.
2, Operators are executed when they needed.
3, Sources(kafka-connectors) starts before operators.
4, The first time operators are called or after GC removes operators' 
instance, a kind of initialization occurs, such as classloading, 
instantiation, memory allocation and so on. It may costs much time.

If there is any misunderstanding, please comment it.
If not, my question is solved.

Regards.
Yuta

On 2017/09/15 17:05, Fabian Hueske wrote:
> Hi Yuta,
> 
> when the execute() method is called, the a so-called JobGraph is 
> constructed from all operators that have been added before by calling 
> map(), keyBy() and so on.
> The JobGraph is then submitted to the JobManager which is the master 
> process in Flink. Based on the JobGraph, the master deploys tasks to the 
> worker processes (TaskManagers).
> These are the tasks that do the actual processing and they are 
> subsequently started as I explained before, i.e., the source task starts 
> consuming from Kafka before subsequent tasks have been started.
> 
> So, there is quite a lot happening when you call execute() including 
> network communication and task deployment.
> 
> Hope this helps,
> Fabian
> 
> 2017-09-15 4:25 GMT+02:00 Yuta Morisawa <yu-morisawa@kddi-research.jp 
> <ma...@kddi-research.jp>>:
> 
>     Hi, Fabian
> 
>     > If I understand you correctly, the problem is only for the first events
>     > that are processed.
>     Yes. More Precisely, first 300 kafka-messages.
> 
>     > AFAIK, Flink lazily instantiates its operators which means that a source
>     > task starts to consume records from Kafka before the subsequent tasks
>     > have been started.
>     That's a great indication. It describe well the affair.
>     But, according to the document, it says "The operations are actually
>     executed when the execution is explicitly triggered by an execute()
>     call on the execution environment.".
>     What does it mean?
>     AFAIK, common Flink programs invoke execute() in main().
>     Every operators start at this time? I think maybe no.
> 
>     - Flink Document
> 
>     https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>
> 
> 
>     > Not sure if or what can be done about this behavior.
>     > I'll loop in Till who knows more about the lifecycle of tasks.
>     Thank you very much for your kindness.
> 
>     Regards, Yuta
> 
>     On 2017/09/14 19:32, Fabian Hueske wrote:
> 
>         Hi,
> 
>         If I understand you correctly, the problem is only for the first
>         events that are processed.
> 
>         AFAIK, Flink lazily instantiates its operators which means that
>         a source task starts to consume records from Kafka before the
>         subsequent tasks have been started.
>         That's why the latency of the first records is higher.
> 
>         Not sure if or what can be done about this behavior.
>         I'll loop in Till who knows more about the lifecycle of tasks.
> 
>         Best, Fabian
> 
> 
>         2017-09-12 11:02 GMT+02:00 Yuta Morisawa
>         <yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>
>         <mailto:yu-morisawa@kddi-research.jp
>         <ma...@kddi-research.jp>>>:
> 
>              Hi,
> 
>              I am worrying about the delay of the Streaming API.
>              My application is that it gets data from kafka-connectors and
>              process them, then push data to kafka-producers.
>              The problem is that the app suffers a long delay when the
>         first data
>              come in the cluster.
>              It takes about 1000ms to process data (I measure the time with
>              kafka-timestamp). On the other hand, it works well after
>         2-3 seconds
>              first data come in (the delay is about 200ms).
> 
>              The application is so delay sensitive that I want to solve
>         this problem.
>              Now, I think this is a matter of JVM but I have no idea to
>              investigate it.
>              Is there any way to avoid this delay?
> 
> 
> 
>              Thank you for your attention
>              Yuta
> 
> 
> 

Re: Streaming API has a long delay at the beginning of the process.

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yuta,

when the execute() method is called, the a so-called JobGraph is
constructed from all operators that have been added before by calling
map(), keyBy() and so on.
The JobGraph is then submitted to the JobManager which is the master
process in Flink. Based on the JobGraph, the master deploys tasks to the
worker processes (TaskManagers).
These are the tasks that do the actual processing and they are subsequently
started as I explained before, i.e., the source task starts consuming from
Kafka before subsequent tasks have been started.

So, there is quite a lot happening when you call execute() including
network communication and task deployment.

Hope this helps,
Fabian

2017-09-15 4:25 GMT+02:00 Yuta Morisawa <yu...@kddi-research.jp>:

> Hi, Fabian
>
> > If I understand you correctly, the problem is only for the first events
> > that are processed.
> Yes. More Precisely, first 300 kafka-messages.
>
> > AFAIK, Flink lazily instantiates its operators which means that a source
> > task starts to consume records from Kafka before the subsequent tasks
> > have been started.
> That's a great indication. It describe well the affair.
> But, according to the document, it says "The operations are actually
> executed when the execution is explicitly triggered by an execute() call on
> the execution environment.".
> What does it mean?
> AFAIK, common Flink programs invoke execute() in main().
> Every operators start at this time? I think maybe no.
>
> - Flink Document
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/api_concepts.html#lazy-evaluation
>
>
> > Not sure if or what can be done about this behavior.
> > I'll loop in Till who knows more about the lifecycle of tasks.
> Thank you very much for your kindness.
>
> Regards, Yuta
>
> On 2017/09/14 19:32, Fabian Hueske wrote:
>
>> Hi,
>>
>> If I understand you correctly, the problem is only for the first events
>> that are processed.
>>
>> AFAIK, Flink lazily instantiates its operators which means that a source
>> task starts to consume records from Kafka before the subsequent tasks have
>> been started.
>> That's why the latency of the first records is higher.
>>
>> Not sure if or what can be done about this behavior.
>> I'll loop in Till who knows more about the lifecycle of tasks.
>>
>> Best, Fabian
>>
>>
>> 2017-09-12 11:02 GMT+02:00 Yuta Morisawa <yu-morisawa@kddi-research.jp
>> <ma...@kddi-research.jp>>:
>>
>>     Hi,
>>
>>     I am worrying about the delay of the Streaming API.
>>     My application is that it gets data from kafka-connectors and
>>     process them, then push data to kafka-producers.
>>     The problem is that the app suffers a long delay when the first data
>>     come in the cluster.
>>     It takes about 1000ms to process data (I measure the time with
>>     kafka-timestamp). On the other hand, it works well after 2-3 seconds
>>     first data come in (the delay is about 200ms).
>>
>>     The application is so delay sensitive that I want to solve this
>> problem.
>>     Now, I think this is a matter of JVM but I have no idea to
>>     investigate it.
>>     Is there any way to avoid this delay?
>>
>>
>>
>>     Thank you for your attention
>>     Yuta
>>
>>
>>

Re: Streaming API has a long delay at the beginning of the process.

Posted by Yuta Morisawa <yu...@kddi-research.jp>.
Hi, Fabian

 > If I understand you correctly, the problem is only for the first events
 > that are processed.
Yes. More Precisely, first 300 kafka-messages.

 > AFAIK, Flink lazily instantiates its operators which means that a source
 > task starts to consume records from Kafka before the subsequent tasks
 > have been started.
That's a great indication. It describe well the affair.
But, according to the document, it says "The operations are actually 
executed when the execution is explicitly triggered by an execute() call 
on the execution environment.".
What does it mean?
AFAIK, common Flink programs invoke execute() in main().
Every operators start at this time? I think maybe no.

- Flink Document
 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation


 > Not sure if or what can be done about this behavior.
 > I'll loop in Till who knows more about the lifecycle of tasks.
Thank you very much for your kindness.

Regards, Yuta

On 2017/09/14 19:32, Fabian Hueske wrote:
> Hi,
> 
> If I understand you correctly, the problem is only for the first events 
> that are processed.
> 
> AFAIK, Flink lazily instantiates its operators which means that a source 
> task starts to consume records from Kafka before the subsequent tasks 
> have been started.
> That's why the latency of the first records is higher.
> 
> Not sure if or what can be done about this behavior.
> I'll loop in Till who knows more about the lifecycle of tasks.
> 
> Best, Fabian
> 
> 
> 2017-09-12 11:02 GMT+02:00 Yuta Morisawa <yu-morisawa@kddi-research.jp 
> <ma...@kddi-research.jp>>:
> 
>     Hi,
> 
>     I am worrying about the delay of the Streaming API.
>     My application is that it gets data from kafka-connectors and
>     process them, then push data to kafka-producers.
>     The problem is that the app suffers a long delay when the first data
>     come in the cluster.
>     It takes about 1000ms to process data (I measure the time with
>     kafka-timestamp). On the other hand, it works well after 2-3 seconds
>     first data come in (the delay is about 200ms).
> 
>     The application is so delay sensitive that I want to solve this problem.
>     Now, I think this is a matter of JVM but I have no idea to
>     investigate it.
>     Is there any way to avoid this delay?
> 
> 
> 
>     Thank you for your attention
>     Yuta
> 
> 

Re: Streaming API has a long delay at the beginning of the process.

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

If I understand you correctly, the problem is only for the first events
that are processed.

AFAIK, Flink lazily instantiates its operators which means that a source
task starts to consume records from Kafka before the subsequent tasks have
been started.
That's why the latency of the first records is higher.

Not sure if or what can be done about this behavior.
I'll loop in Till who knows more about the lifecycle of tasks.

Best, Fabian


2017-09-12 11:02 GMT+02:00 Yuta Morisawa <yu...@kddi-research.jp>:

> Hi,
>
> I am worrying about the delay of the Streaming API.
> My application is that it gets data from kafka-connectors and process
> them, then push data to kafka-producers.
> The problem is that the app suffers a long delay when the first data come
> in the cluster.
> It takes about 1000ms to process data (I measure the time with
> kafka-timestamp). On the other hand, it works well after 2-3 seconds first
> data come in (the delay is about 200ms).
>
> The application is so delay sensitive that I want to solve this problem.
> Now, I think this is a matter of JVM but I have no idea to investigate it.
> Is there any way to avoid this delay?
>
>
>
> Thank you for your attention
> Yuta
>