You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steven Nelson <sn...@sourceallies.com> on 2019/05/07 21:48:49 UTC

Testing with ProcessingTime and FromElementsFunction with TestEnvironment

Hello!

I am trying to write a test that runs in the TestEnviroment. I create a
process that uses ProcessingTime, has a source constructed from a
FromElementsFunction and runs data through a Keyed Stream into
a ProcessingTimeSessionWindows.withGap().

The problem is that it appears that the env.execute method returns
immediately after the session closes, not allowing the events to be
released from the window before shutdown occurs. This used to work when I
used EventTime.

Thoughts?
-Steve

Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

Posted by Till Rohrmann <tr...@apache.org>.
Hi Michal,

you need to implement a source which does not terminate. Take a look at the
InifiteSource [1] which does exactly this. That way there won't be a
Long.MAX_VALUE being sent when closing the source operator.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java#L240

Cheers,
Till

On Fri, Aug 9, 2019 at 12:00 PM Michal Klempa <mi...@gmail.com>
wrote:

> Hi guys,
> I have opposite issue :-) I would like to unit test negative behavior
> - that the Event Time timer is not fired when no further event arrives
> (which would advance the watermarks).
> But due to StreamSource firing Long.MAX_VALUE watermark after enclosed
> finite FromElementsFunction run method depletes, I was get all timers
> fired in tested operator (and ProcessFunction).
> Is anybody aware of a method, how to test or design Source Function
> prevent watermark advancing, so that the timers are not fired (which
> is desired behavior - to be guarded by unit tests).
> Thanks.
> Michal
>
> On Thu, May 9, 2019 at 9:02 AM Till Rohrmann <tr...@apache.org> wrote:
> >
> > Hi Steve,
> >
> > afaik there is no such thing in Flink. I agree that Flink's testing
> utilities should be improved. If you implement such a source, then you
> might be able to contribute it back to the community. That would be super
> helpful.
> >
> > Cheers,
> > Till
> >
> > On Wed, May 8, 2019 at 6:40 PM Steven Nelson <sn...@sourceallies.com>
> wrote:
> >>
> >>
> >> That’s what I figured was happening :( Your explanation is a lot better
> than what I gave to my team, so that will help a lot, thank you!
> >>
> >> Is there a testing source already created that does this sort of thing?
> The Flink-testing library seems a bit sparse.
> >>
> >> -Steve
> >>
> >> Sent from my iPhone
> >>
> >> On May 8, 2019, at 9:33 AM, Till Rohrmann <tr...@apache.org> wrote:
> >>
> >> Hi Steve,
> >>
> >> I think the reason for the different behaviour is due to the way event
> time and processing time are implemented.
> >>
> >> When you are using event time, watermarks need to travel through the
> topology denoting the current event time. When you source terminates, the
> system will send a watermark with Long.MAX_VALUE through the topology. This
> will effectively trigger the completion of all pending event time
> operations.
> >>
> >> In the case of processing time, Flink does not do this. Instead it
> simply relies on the processing time clocks on each machine. Hence, there
> is no way for Flink to tell the different machines that their respective
> processing time clocks should proceed to a certain time in case of a
> shutdown. Instead you should make sure that you don't terminate the job
> before a certain time (processing time) has passed. You could do this by
> adding a sleep to your source function after you've output all records and
> just before leaving the source loop.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <sn...@sourceallies.com>
> wrote:
> >>>
> >>> Hello!
> >>>
> >>> I am trying to write a test that runs in the TestEnviroment. I create
> a process that uses ProcessingTime, has a source constructed from a
> FromElementsFunction and runs data through a Keyed Stream into a
> ProcessingTimeSessionWindows.withGap().
> >>>
> >>> The problem is that it appears that the env.execute method returns
> immediately after the session closes, not allowing the events to be
> released from the window before shutdown occurs. This used to work when I
> used EventTime.
> >>>
> >>> Thoughts?
> >>> -Steve
>

Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

Posted by Michal Klempa <mi...@gmail.com>.
Hi guys,
I have opposite issue :-) I would like to unit test negative behavior
- that the Event Time timer is not fired when no further event arrives
(which would advance the watermarks).
But due to StreamSource firing Long.MAX_VALUE watermark after enclosed
finite FromElementsFunction run method depletes, I was get all timers
fired in tested operator (and ProcessFunction).
Is anybody aware of a method, how to test or design Source Function
prevent watermark advancing, so that the timers are not fired (which
is desired behavior - to be guarded by unit tests).
Thanks.
Michal

On Thu, May 9, 2019 at 9:02 AM Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Steve,
>
> afaik there is no such thing in Flink. I agree that Flink's testing utilities should be improved. If you implement such a source, then you might be able to contribute it back to the community. That would be super helpful.
>
> Cheers,
> Till
>
> On Wed, May 8, 2019 at 6:40 PM Steven Nelson <sn...@sourceallies.com> wrote:
>>
>>
>> That’s what I figured was happening :( Your explanation is a lot better than what I gave to my team, so that will help a lot, thank you!
>>
>> Is there a testing source already created that does this sort of thing? The Flink-testing library seems a bit sparse.
>>
>> -Steve
>>
>> Sent from my iPhone
>>
>> On May 8, 2019, at 9:33 AM, Till Rohrmann <tr...@apache.org> wrote:
>>
>> Hi Steve,
>>
>> I think the reason for the different behaviour is due to the way event time and processing time are implemented.
>>
>> When you are using event time, watermarks need to travel through the topology denoting the current event time. When you source terminates, the system will send a watermark with Long.MAX_VALUE through the topology. This will effectively trigger the completion of all pending event time operations.
>>
>> In the case of processing time, Flink does not do this. Instead it simply relies on the processing time clocks on each machine. Hence, there is no way for Flink to tell the different machines that their respective processing time clocks should proceed to a certain time in case of a shutdown. Instead you should make sure that you don't terminate the job before a certain time (processing time) has passed. You could do this by adding a sleep to your source function after you've output all records and just before leaving the source loop.
>>
>> Cheers,
>> Till
>>
>> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <sn...@sourceallies.com> wrote:
>>>
>>> Hello!
>>>
>>> I am trying to write a test that runs in the TestEnviroment. I create a process that uses ProcessingTime, has a source constructed from a FromElementsFunction and runs data through a Keyed Stream into a ProcessingTimeSessionWindows.withGap().
>>>
>>> The problem is that it appears that the env.execute method returns immediately after the session closes, not allowing the events to be released from the window before shutdown occurs. This used to work when I used EventTime.
>>>
>>> Thoughts?
>>> -Steve

Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

Posted by Till Rohrmann <tr...@apache.org>.
Hi Steve,

afaik there is no such thing in Flink. I agree that Flink's testing
utilities should be improved. If you implement such a source, then you
might be able to contribute it back to the community. That would be super
helpful.

Cheers,
Till

On Wed, May 8, 2019 at 6:40 PM Steven Nelson <sn...@sourceallies.com>
wrote:

>
> That’s what I figured was happening :( Your explanation is a lot better
> than what I gave to my team, so that will help a lot, thank you!
>
> Is there a testing source already created that does this sort of thing?
> The Flink-testing library seems a bit sparse.
>
> -Steve
>
> Sent from my iPhone
>
> On May 8, 2019, at 9:33 AM, Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Steve,
>
> I think the reason for the different behaviour is due to the way event
> time and processing time are implemented.
>
> When you are using event time, watermarks need to travel through the
> topology denoting the current event time. When you source terminates, the
> system will send a watermark with Long.MAX_VALUE through the topology. This
> will effectively trigger the completion of all pending event time
> operations.
>
> In the case of processing time, Flink does not do this. Instead it simply
> relies on the processing time clocks on each machine. Hence, there is no
> way for Flink to tell the different machines that their respective
> processing time clocks should proceed to a certain time in case of a
> shutdown. Instead you should make sure that you don't terminate the job
> before a certain time (processing time) has passed. You could do this by
> adding a sleep to your source function after you've output all records and
> just before leaving the source loop.
>
> Cheers,
> Till
>
> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <sn...@sourceallies.com>
> wrote:
>
>> Hello!
>>
>> I am trying to write a test that runs in the TestEnviroment. I create a
>> process that uses ProcessingTime, has a source constructed from a
>> FromElementsFunction and runs data through a Keyed Stream into
>> a ProcessingTimeSessionWindows.withGap().
>>
>> The problem is that it appears that the env.execute method returns
>> immediately after the session closes, not allowing the events to be
>> released from the window before shutdown occurs. This used to work when I
>> used EventTime.
>>
>> Thoughts?
>> -Steve
>>
>

Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

Posted by Steven Nelson <sn...@sourceallies.com>.
That’s what I figured was happening :( Your explanation is a lot better than what I gave to my team, so that will help a lot, thank you!

Is there a testing source already created that does this sort of thing? The Flink-testing library seems a bit sparse.

-Steve

Sent from my iPhone

> On May 8, 2019, at 9:33 AM, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Steve,
> 
> I think the reason for the different behaviour is due to the way event time and processing time are implemented. 
> 
> When you are using event time, watermarks need to travel through the topology denoting the current event time. When you source terminates, the system will send a watermark with Long.MAX_VALUE through the topology. This will effectively trigger the completion of all pending event time operations.
> 
> In the case of processing time, Flink does not do this. Instead it simply relies on the processing time clocks on each machine. Hence, there is no way for Flink to tell the different machines that their respective processing time clocks should proceed to a certain time in case of a shutdown. Instead you should make sure that you don't terminate the job before a certain time (processing time) has passed. You could do this by adding a sleep to your source function after you've output all records and just before leaving the source loop.
> 
> Cheers,
> Till
> 
>> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <sn...@sourceallies.com> wrote:
>> Hello!
>> 
>> I am trying to write a test that runs in the TestEnviroment. I create a process that uses ProcessingTime, has a source constructed from a FromElementsFunction and runs data through a Keyed Stream into a ProcessingTimeSessionWindows.withGap().
>> 
>> The problem is that it appears that the env.execute method returns immediately after the session closes, not allowing the events to be released from the window before shutdown occurs. This used to work when I used EventTime. 
>> 
>> Thoughts?
>> -Steve

Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

Posted by Till Rohrmann <tr...@apache.org>.
Hi Steve,

I think the reason for the different behaviour is due to the way event time
and processing time are implemented.

When you are using event time, watermarks need to travel through the
topology denoting the current event time. When you source terminates, the
system will send a watermark with Long.MAX_VALUE through the topology. This
will effectively trigger the completion of all pending event time
operations.

In the case of processing time, Flink does not do this. Instead it simply
relies on the processing time clocks on each machine. Hence, there is no
way for Flink to tell the different machines that their respective
processing time clocks should proceed to a certain time in case of a
shutdown. Instead you should make sure that you don't terminate the job
before a certain time (processing time) has passed. You could do this by
adding a sleep to your source function after you've output all records and
just before leaving the source loop.

Cheers,
Till

On Tue, May 7, 2019 at 11:49 PM Steven Nelson <sn...@sourceallies.com>
wrote:

> Hello!
>
> I am trying to write a test that runs in the TestEnviroment. I create a
> process that uses ProcessingTime, has a source constructed from a
> FromElementsFunction and runs data through a Keyed Stream into
> a ProcessingTimeSessionWindows.withGap().
>
> The problem is that it appears that the env.execute method returns
> immediately after the session closes, not allowing the events to be
> released from the window before shutdown occurs. This used to work when I
> used EventTime.
>
> Thoughts?
> -Steve
>