You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paul Lam <pa...@gmail.com> on 2018/08/02 08:11:52 UTC

Re: Multiple output operations in a job vs multiple jobs


> 在 2018年7月31日,15:47,vino yang <ya...@gmail.com> 写道:
> 
> Hi anna,
> 
> 1. The srcstream is a very high volume stream and the window size is 2 weeks and 4 weeks. Is the window size a problem? In this case, I think it is not a problem because I am using reduce which stores only 1 value per window. Is that right?
> 
> >> Window Size is based on your business needs settings. However, if the window size is too large, the status of the job will be large, which will result in a longer recovery failure. You need to be aware of this. One value per window is just a value calculated by the window. It caches all data for the period of time before the window is triggered.
> 
> 2. I am having 2 output operations one with 2 weeks window and the other with 4 weeks window. Are they executed in parallel or in sequence?
> 
> >> These two windows are calculated in parallel.
> 
> 3. When I have multiple output operations like in this case should I break it into 2 different jobs ?
> 
> >> Both modes are ok. When there is only one job, the two windows will share the source stream, but this will result in a larger state of the job and a slower recovery. When split into two jobs, there will be two consumptions of kafka, but the two windows are independent in both jobs.
> 
> 4. Can I run multiple jobs on the same cluster?
> 
> >> For Standalone cluster mode or Yarn Flink Session mode, etc., there is no problem. For Flink on yarn single job mode, a cluster can usually only run one job, which is the recommended mode.
> 
> Thanks, vino.
> 
> 2018-07-31 15:11 GMT+08:00 anna stax <an...@gmail.com>:
> Hi all,
> 
> I am not sure when I should go for multiple jobs or have 1 job with all the sources and sinks. Following is my code.
> 
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>     .......
>     // create a Kafka source
>     val srcstream = env.addSource(consumer)
> 
>     srcstream
>       .keyBy(0)
>       .window(ProcessingTimeSessionWindows.withGap(Time.days(14)))
>       .reduce  ...
>       .map ...
>       .addSink ...  
> 
>     srcstream
>       .keyBy(0)
>       .window(ProcessingTimeSessionWindows.withGap(Time.days(28)))
>       .reduce  ...
>       .map ...
>       .addSink ...
> 
>     env.execute("Job1")
> 
> My questions
> 
> 1. The srcstream is a very high volume stream and the window size is 2 weeks and 4 weeks. Is the window size a problem? In this case, I think it is not a problem because I am using reduce which stores only 1 value per window. Is that right?
> 
> 2. I am having 2 output operations one with 2 weeks window and the other with 4 weeks window. Are they executed in parallel or in sequence?
> 
> 3. When I have multiple output operations like in this case should I break it into 2 different jobs ?
> 
> 4. Can I run multiple jobs on the same cluster?
> 
> Thanks
> 
>  
> 

Hi yang, 

I’m a bit confused about the window data cache that you mentioned.

> It caches all data for the period of time before the window is triggered.

In my understanding, window functions process elements incrementally unless the low level API ProcessWindowFunction was used, so caching data should not be required in most scenarios. Would you mind giving more details of the window caching design? And please correct me if I’m wrong. Thanks a lot.

Best regards, 
Paul Lam

Re: Multiple output operations in a job vs multiple jobs

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Paul is right.
Which and how much data is stored in state for a window depends on the type
of the function that is applied on the windows:

- ReduceFunction: Only the reduced value is stored
- AggregateFunction: Only the accumulator value is stored
- WindowFunction or ProcessWindowFunction: All original records are stored.

So in Anna's jobs, each window only stores a single value.
Hence, the state size is independent of the size of the window (unless, the
reduced value collects values of all input records, e.g., in a list or set).

Best, Fabian

2018-08-02 10:29 GMT+02:00 vino yang <ya...@gmail.com>:

> Hi Paul,
>
> Yes, I am talking about the normal case, Flink must store the data in the
> window as a state to prevent failure.
> In some scenarios your understanding is also correct, and flink uses the
> window pane to optimize window calculations.
> So, if your scene is in optimized mode, ignore this.
>
> Thanks, vino.
>
> 2018-08-02 16:11 GMT+08:00 Paul Lam <pa...@gmail.com>:
>
>>
>>
>> > 在 2018年7月31日,15:47,vino yang <ya...@gmail.com> 写道:
>> >
>> > Hi anna,
>> >
>> > 1. The srcstream is a very high volume stream and the window size is 2
>> weeks and 4 weeks. Is the window size a problem? In this case, I think it
>> is not a problem because I am using reduce which stores only 1 value per
>> window. Is that right?
>> >
>> > >> Window Size is based on your business needs settings. However, if
>> the window size is too large, the status of the job will be large, which
>> will result in a longer recovery failure. You need to be aware of this. One
>> value per window is just a value calculated by the window. It caches all
>> data for the period of time before the window is triggered.
>> >
>> > 2. I am having 2 output operations one with 2 weeks window and the
>> other with 4 weeks window. Are they executed in parallel or in sequence?
>> >
>> > >> These two windows are calculated in parallel.
>> >
>> > 3. When I have multiple output operations like in this case should I
>> break it into 2 different jobs ?
>> >
>> > >> Both modes are ok. When there is only one job, the two windows will
>> share the source stream, but this will result in a larger state of the job
>> and a slower recovery. When split into two jobs, there will be two
>> consumptions of kafka, but the two windows are independent in both jobs.
>> >
>> > 4. Can I run multiple jobs on the same cluster?
>> >
>> > >> For Standalone cluster mode or Yarn Flink Session mode, etc., there
>> is no problem. For Flink on yarn single job mode, a cluster can usually
>> only run one job, which is the recommended mode.
>> >
>> > Thanks, vino.
>> >
>> > 2018-07-31 15:11 GMT+08:00 anna stax <an...@gmail.com>:
>> > Hi all,
>> >
>> > I am not sure when I should go for multiple jobs or have 1 job with all
>> the sources and sinks. Following is my code.
>> >
>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >     .......
>> >     // create a Kafka source
>> >     val srcstream = env.addSource(consumer)
>> >
>> >     srcstream
>> >       .keyBy(0)
>> >       .window(ProcessingTimeSessionWindows.withGap(Time.days(14)))
>> >       .reduce  ...
>> >       .map ...
>> >       .addSink ...
>> >
>> >     srcstream
>> >       .keyBy(0)
>> >       .window(ProcessingTimeSessionWindows.withGap(Time.days(28)))
>> >       .reduce  ...
>> >       .map ...
>> >       .addSink ...
>> >
>> >     env.execute("Job1")
>> >
>> > My questions
>> >
>> > 1. The srcstream is a very high volume stream and the window size is 2
>> weeks and 4 weeks. Is the window size a problem? In this case, I think it
>> is not a problem because I am using reduce which stores only 1 value per
>> window. Is that right?
>> >
>> > 2. I am having 2 output operations one with 2 weeks window and the
>> other with 4 weeks window. Are they executed in parallel or in sequence?
>> >
>> > 3. When I have multiple output operations like in this case should I
>> break it into 2 different jobs ?
>> >
>> > 4. Can I run multiple jobs on the same cluster?
>> >
>> > Thanks
>> >
>> >
>> >
>>
>> Hi yang,
>>
>> I’m a bit confused about the window data cache that you mentioned.
>>
>> > It caches all data for the period of time before the window is
>> triggered.
>>
>> In my understanding, window functions process elements incrementally
>> unless the low level API ProcessWindowFunction was used, so caching data
>> should not be required in most scenarios. Would you mind giving more
>> details of the window caching design? And please correct me if I’m wrong.
>> Thanks a lot.
>>
>> Best regards,
>> Paul Lam
>
>
>

Re: Multiple output operations in a job vs multiple jobs

Posted by vino yang <ya...@gmail.com>.
Hi Paul,

Yes, I am talking about the normal case, Flink must store the data in the
window as a state to prevent failure.
In some scenarios your understanding is also correct, and flink uses the
window pane to optimize window calculations.
So, if your scene is in optimized mode, ignore this.

Thanks, vino.

2018-08-02 16:11 GMT+08:00 Paul Lam <pa...@gmail.com>:

>
>
> > 在 2018年7月31日,15:47,vino yang <ya...@gmail.com> 写道:
> >
> > Hi anna,
> >
> > 1. The srcstream is a very high volume stream and the window size is 2
> weeks and 4 weeks. Is the window size a problem? In this case, I think it
> is not a problem because I am using reduce which stores only 1 value per
> window. Is that right?
> >
> > >> Window Size is based on your business needs settings. However, if the
> window size is too large, the status of the job will be large, which will
> result in a longer recovery failure. You need to be aware of this. One
> value per window is just a value calculated by the window. It caches all
> data for the period of time before the window is triggered.
> >
> > 2. I am having 2 output operations one with 2 weeks window and the other
> with 4 weeks window. Are they executed in parallel or in sequence?
> >
> > >> These two windows are calculated in parallel.
> >
> > 3. When I have multiple output operations like in this case should I
> break it into 2 different jobs ?
> >
> > >> Both modes are ok. When there is only one job, the two windows will
> share the source stream, but this will result in a larger state of the job
> and a slower recovery. When split into two jobs, there will be two
> consumptions of kafka, but the two windows are independent in both jobs.
> >
> > 4. Can I run multiple jobs on the same cluster?
> >
> > >> For Standalone cluster mode or Yarn Flink Session mode, etc., there
> is no problem. For Flink on yarn single job mode, a cluster can usually
> only run one job, which is the recommended mode.
> >
> > Thanks, vino.
> >
> > 2018-07-31 15:11 GMT+08:00 anna stax <an...@gmail.com>:
> > Hi all,
> >
> > I am not sure when I should go for multiple jobs or have 1 job with all
> the sources and sinks. Following is my code.
> >
> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     .......
> >     // create a Kafka source
> >     val srcstream = env.addSource(consumer)
> >
> >     srcstream
> >       .keyBy(0)
> >       .window(ProcessingTimeSessionWindows.withGap(Time.days(14)))
> >       .reduce  ...
> >       .map ...
> >       .addSink ...
> >
> >     srcstream
> >       .keyBy(0)
> >       .window(ProcessingTimeSessionWindows.withGap(Time.days(28)))
> >       .reduce  ...
> >       .map ...
> >       .addSink ...
> >
> >     env.execute("Job1")
> >
> > My questions
> >
> > 1. The srcstream is a very high volume stream and the window size is 2
> weeks and 4 weeks. Is the window size a problem? In this case, I think it
> is not a problem because I am using reduce which stores only 1 value per
> window. Is that right?
> >
> > 2. I am having 2 output operations one with 2 weeks window and the other
> with 4 weeks window. Are they executed in parallel or in sequence?
> >
> > 3. When I have multiple output operations like in this case should I
> break it into 2 different jobs ?
> >
> > 4. Can I run multiple jobs on the same cluster?
> >
> > Thanks
> >
> >
> >
>
> Hi yang,
>
> I’m a bit confused about the window data cache that you mentioned.
>
> > It caches all data for the period of time before the window is triggered.
>
> In my understanding, window functions process elements incrementally
> unless the low level API ProcessWindowFunction was used, so caching data
> should not be required in most scenarios. Would you mind giving more
> details of the window caching design? And please correct me if I’m wrong.
> Thanks a lot.
>
> Best regards,
> Paul Lam