You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Klemens Muthmann <kl...@cyface.de> on 2020/11/24 08:59:53 UTC

Question: How to avoid local execution being terminated before session window closes

Hi,

I have written an Apache Flink Pipeline containing the following piece 
of code (Java):

stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
CustomAggregator()).print();

If I run the pipeline using local execution I see the following 
behavior. The "CustomAggregator" calls the `createAccumulator` and `add` 
methods correctly with the correct data. However it never calls 
`getResult` and my pipeline simply finishes.

So I did a little research and found out that it works if I change the 
code to:

stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
CustomAggregator()).print();

Notice the reduced gap time for the processing time session window. So 
it seems that execution only continues if the window has been closed and 
if that takes too long, the execution simply aborts. I guess another 
factor playing a part in the problem is, that my initial data is read in 
much faster than 50 seconds. This results in the pipeline being in a 
state where it only waits for the window to be closed and having nothing 
else to do it decides that there is no work left and simply shuts down.

My question now is if it is possible to tell the local execution 
environment to wait for that window to be closed, instead of just 
shutting down.

Thanks and Regards

     Klemens Muthmann


Re: Question: How to avoid local execution being terminated before session window closes

Posted by Klemens Muthmann <kl...@cyface.de>.
Hi,

Thanks for the hint. The infinite loop was the solution and my pipeline 
works now.

Regards

     Klemens

Am 24.11.20 um 16:59 schrieb Timo Walther:
> For debugging you can also implement a simple non-parallel source 
> using 
> `org.apache.flink.streaming.api.functions.source.SourceFunction`. You 
> would need to implement the run() method with an endless loop after 
> emitting all your records.
>
> Regards,
> Timo
>
> On 24.11.20 16:07, Klemens Muthmann wrote:
>> Hi,
>>
>> Thanks for your reply. I am using processing time instead of event 
>> time, since we do get the events in batches and some might arrive 
>> days later.
>>
>> But for my current dev setup I just use a CSV dump of finite size as 
>> input. I will hand over the pipeline to some other guys, who will 
>> need to integrate it with an Apache Kafka Service. Output is written 
>> to a Postgres-Database-System.
>>
>> I'll have a look at your proposal and let you know if it worked, 
>> after having finished a few prerequisite parts.
>>
>> Regards
>>
>>      Klemens
>>
>> Am 24.11.20 um 12:59 schrieb Timo Walther:
>>> Hi Klemens,
>>>
>>> what you are observing are reasons why event-time should be 
>>> preferred over processing-time. Event-time uses the timestamp of 
>>> your data while processing-time is to basic for many use cases. Esp. 
>>> when you want to reprocess historic data, you want to do that at 
>>> full speed instead of waiting 1 hour for 1-hour-windows.
>>>
>>> If you want to use processing-time nevertheless, you need to use a 
>>> source that produced unbounded streams instead of bounded streams 
>>> such that the pipeline execution theoretically is infinite. Some 
>>> documentation can be found here [1] where you need to use the 
>>> `FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of 
>>> connector are you currently using?
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources 
>>>
>>>
>>> On 24.11.20 09:59, Klemens Muthmann wrote:
>>>> Hi,
>>>>
>>>> I have written an Apache Flink Pipeline containing the following 
>>>> piece of code (Java):
>>>>
>>>> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
>>>> CustomAggregator()).print();
>>>>
>>>> If I run the pipeline using local execution I see the following 
>>>> behavior. The "CustomAggregator" calls the `createAccumulator` and 
>>>> `add` methods correctly with the correct data. However it never 
>>>> calls `getResult` and my pipeline simply finishes.
>>>>
>>>> So I did a little research and found out that it works if I change 
>>>> the code to:
>>>>
>>>> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
>>>> CustomAggregator()).print();
>>>>
>>>> Notice the reduced gap time for the processing time session window. 
>>>> So it seems that execution only continues if the window has been 
>>>> closed and if that takes too long, the execution simply aborts. I 
>>>> guess another factor playing a part in the problem is, that my 
>>>> initial data is read in much faster than 50 seconds. This results 
>>>> in the pipeline being in a state where it only waits for the window 
>>>> to be closed and having nothing else to do it decides that there is 
>>>> no work left and simply shuts down.
>>>>
>>>> My question now is if it is possible to tell the local execution 
>>>> environment to wait for that window to be closed, instead of just 
>>>> shutting down.
>>>>
>>>> Thanks and Regards
>>>>
>>>>      Klemens Muthmann
>>>>
>>>
>
-- 
Mit freundlichen Grüßen
           Dr.-Ing. Klemens Muthmann

-----------------------------------
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthmann@cyface.de


Re: Question: How to avoid local execution being terminated before session window closes

Posted by Timo Walther <tw...@apache.org>.
For debugging you can also implement a simple non-parallel source using 
`org.apache.flink.streaming.api.functions.source.SourceFunction`. You 
would need to implement the run() method with an endless loop after 
emitting all your records.

Regards,
Timo

On 24.11.20 16:07, Klemens Muthmann wrote:
> Hi,
> 
> Thanks for your reply. I am using processing time instead of event time, 
> since we do get the events in batches and some might arrive days later.
> 
> But for my current dev setup I just use a CSV dump of finite size as 
> input. I will hand over the pipeline to some other guys, who will need 
> to integrate it with an Apache Kafka Service. Output is written to a 
> Postgres-Database-System.
> 
> I'll have a look at your proposal and let you know if it worked, after 
> having finished a few prerequisite parts.
> 
> Regards
> 
>      Klemens
> 
> Am 24.11.20 um 12:59 schrieb Timo Walther:
>> Hi Klemens,
>>
>> what you are observing are reasons why event-time should be preferred 
>> over processing-time. Event-time uses the timestamp of your data while 
>> processing-time is to basic for many use cases. Esp. when you want to 
>> reprocess historic data, you want to do that at full speed instead of 
>> waiting 1 hour for 1-hour-windows.
>>
>> If you want to use processing-time nevertheless, you need to use a 
>> source that produced unbounded streams instead of bounded streams such 
>> that the pipeline execution theoretically is infinite. Some 
>> documentation can be found here [1] where you need to use the 
>> `FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of connector 
>> are you currently using?
>>
>> Regards,
>> Timo
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources 
>>
>>
>> On 24.11.20 09:59, Klemens Muthmann wrote:
>>> Hi,
>>>
>>> I have written an Apache Flink Pipeline containing the following 
>>> piece of code (Java):
>>>
>>> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
>>> CustomAggregator()).print();
>>>
>>> If I run the pipeline using local execution I see the following 
>>> behavior. The "CustomAggregator" calls the `createAccumulator` and 
>>> `add` methods correctly with the correct data. However it never calls 
>>> `getResult` and my pipeline simply finishes.
>>>
>>> So I did a little research and found out that it works if I change 
>>> the code to:
>>>
>>> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
>>> CustomAggregator()).print();
>>>
>>> Notice the reduced gap time for the processing time session window. 
>>> So it seems that execution only continues if the window has been 
>>> closed and if that takes too long, the execution simply aborts. I 
>>> guess another factor playing a part in the problem is, that my 
>>> initial data is read in much faster than 50 seconds. This results in 
>>> the pipeline being in a state where it only waits for the window to 
>>> be closed and having nothing else to do it decides that there is no 
>>> work left and simply shuts down.
>>>
>>> My question now is if it is possible to tell the local execution 
>>> environment to wait for that window to be closed, instead of just 
>>> shutting down.
>>>
>>> Thanks and Regards
>>>
>>>      Klemens Muthmann
>>>
>>


Re: Question: How to avoid local execution being terminated before session window closes

Posted by Klemens Muthmann <kl...@cyface.de>.
Hi,

Thanks for your reply. I am using processing time instead of event time, 
since we do get the events in batches and some might arrive days later.

But for my current dev setup I just use a CSV dump of finite size as 
input. I will hand over the pipeline to some other guys, who will need 
to integrate it with an Apache Kafka Service. Output is written to a 
Postgres-Database-System.

I'll have a look at your proposal and let you know if it worked, after 
having finished a few prerequisite parts.

Regards

     Klemens

Am 24.11.20 um 12:59 schrieb Timo Walther:
> Hi Klemens,
>
> what you are observing are reasons why event-time should be preferred 
> over processing-time. Event-time uses the timestamp of your data while 
> processing-time is to basic for many use cases. Esp. when you want to 
> reprocess historic data, you want to do that at full speed instead of 
> waiting 1 hour for 1-hour-windows.
>
> If you want to use processing-time nevertheless, you need to use a 
> source that produced unbounded streams instead of bounded streams such 
> that the pipeline execution theoretically is infinite. Some 
> documentation can be found here [1] where you need to use the 
> `FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of connector 
> are you currently using?
>
> Regards,
> Timo
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources
>
> On 24.11.20 09:59, Klemens Muthmann wrote:
>> Hi,
>>
>> I have written an Apache Flink Pipeline containing the following 
>> piece of code (Java):
>>
>> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
>> CustomAggregator()).print();
>>
>> If I run the pipeline using local execution I see the following 
>> behavior. The "CustomAggregator" calls the `createAccumulator` and 
>> `add` methods correctly with the correct data. However it never calls 
>> `getResult` and my pipeline simply finishes.
>>
>> So I did a little research and found out that it works if I change 
>> the code to:
>>
>> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
>> CustomAggregator()).print();
>>
>> Notice the reduced gap time for the processing time session window. 
>> So it seems that execution only continues if the window has been 
>> closed and if that takes too long, the execution simply aborts. I 
>> guess another factor playing a part in the problem is, that my 
>> initial data is read in much faster than 50 seconds. This results in 
>> the pipeline being in a state where it only waits for the window to 
>> be closed and having nothing else to do it decides that there is no 
>> work left and simply shuts down.
>>
>> My question now is if it is possible to tell the local execution 
>> environment to wait for that window to be closed, instead of just 
>> shutting down.
>>
>> Thanks and Regards
>>
>>      Klemens Muthmann
>>
>
-- 
Mit freundlichen Grüßen
           Dr.-Ing. Klemens Muthmann

-----------------------------------
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthmann@cyface.de


Re: Question: How to avoid local execution being terminated before session window closes

Posted by Timo Walther <tw...@apache.org>.
Hi Klemens,

what you are observing are reasons why event-time should be preferred 
over processing-time. Event-time uses the timestamp of your data while 
processing-time is to basic for many use cases. Esp. when you want to 
reprocess historic data, you want to do that at full speed instead of 
waiting 1 hour for 1-hour-windows.

If you want to use processing-time nevertheless, you need to use a 
source that produced unbounded streams instead of bounded streams such 
that the pipeline execution theoretically is infinite. Some 
documentation can be found here [1] where you need to use the 
`FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of connector 
are you currently using?

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources

On 24.11.20 09:59, Klemens Muthmann wrote:
> Hi,
> 
> I have written an Apache Flink Pipeline containing the following piece 
> of code (Java):
> 
> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
> CustomAggregator()).print();
> 
> If I run the pipeline using local execution I see the following 
> behavior. The "CustomAggregator" calls the `createAccumulator` and `add` 
> methods correctly with the correct data. However it never calls 
> `getResult` and my pipeline simply finishes.
> 
> So I did a little research and found out that it works if I change the 
> code to:
> 
> stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
> CustomAggregator()).print();
> 
> Notice the reduced gap time for the processing time session window. So 
> it seems that execution only continues if the window has been closed and 
> if that takes too long, the execution simply aborts. I guess another 
> factor playing a part in the problem is, that my initial data is read in 
> much faster than 50 seconds. This results in the pipeline being in a 
> state where it only waits for the window to be closed and having nothing 
> else to do it decides that there is no work left and simply shuts down.
> 
> My question now is if it is possible to tell the local execution 
> environment to wait for that window to be closed, instead of just 
> shutting down.
> 
> Thanks and Regards
> 
>      Klemens Muthmann
>