You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Nguyen, Michael" <Mi...@T-Mobile.com> on 2019/10/28 07:17:52 UTC

Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

Hello everbody,

Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a KeyedDataStream? I have reviewed the testing page on Flink’s official website (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html) and I am not quite sure how I could utilize these two functions in an .aggregate() operator for my testing.

Here’s how I am using the AggregateFunction (EventCountAggregate()) and ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
DataStream<Tuple2<Date, Integer>> ec2EventsAggregate =
        ec2Events
                .keyBy(t -> t.f0)
                .timeWindow(Time.minutes(30))
                .aggregate(new EventCountAggregate(), new CalculateWindowTotal())
                .name("EC2 creation interval count");


EventCountAggregate() is counting the each element in ec2Events datastream.

CalculateWindowTotal() takes the timestamp of each 30 minute window and correlates it to the number of elements that has been counted so far for the window which returns a Tuple2 containg the end timestamp and the count of elements.


Thanks,
Michael

Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

Posted by vino yang <ya...@gmail.com>.
Hi Michael,

From the WindowTranslationTest, I did not see anything about the
initialization of mini-cluster. Here we are testing operator, it seems
operator test harness has provided the necessary infrastructure.

You can try to see if there is anything missed.

Best,
Vino

Nguyen, Michael <Mi...@t-mobile.com> 于2019年10月28日周一 下午4:51写道:

> Hi Vino,
>
>
>
> This is a great example – thank you!
>
>
>
> It looks like I need to instantiate a StreamExecutionEnvironment to order
> to get my OneInputStreamOperator. Would I need to setup a local
> flinkCluster using MiniClusterWithClientResource in order to use
> StreamExecutionEnvironment?
>
>
>
>
>
> Best,
>
> Michael
>
>
>
>
>
> *From: *vino yang <ya...@gmail.com>
> *Date: *Monday, October 28, 2019 at 1:32 AM
> *To: *Michael Nguyen <Mi...@T-Mobile.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Testing AggregateFunction() and ProcessWindowFunction() on
> KeyedDataStream
>
>
>
> *[External]*
>
>
>
> Hi Michael,
>
>
>
> You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.
>
>
>
> You can consider
> `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
> `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
> of them call `processElementAndEnsureOutput`) as a example.
>
>
>
> [1]:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D&reserved=0>
>
>
>
> Best,
>
> Vino
>
>
>
> Nguyen, Michael <Mi...@t-mobile.com> 于2019年10月28日周一 下午3:18写道:
>
> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D&reserved=0>)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream<Tuple2<Date, Integer>> ec2EventsAggregate =
>         ec2Events
>                 .keyBy(t -> t.f0)
>                 .timeWindow(Time.*minutes*(30))
>                 .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
>                 .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>
>

Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

Posted by "Nguyen, Michael" <Mi...@T-Mobile.com>.
Hi Vino,

This is a great example – thank you!

It looks like I need to instantiate a StreamExecutionEnvironment to order to get my OneInputStreamOperator. Would I need to setup a local flinkCluster using MiniClusterWithClientResource in order to use StreamExecutionEnvironment?


Best,
Michael


From: vino yang <ya...@gmail.com>
Date: Monday, October 28, 2019 at 1:32 AM
To: Michael Nguyen <Mi...@T-Mobile.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

[External]

Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both of them call `processElementAndEnsureOutput`) as a example.

[1]: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D&reserved=0>

Best,
Vino

Nguyen, Michael <Mi...@t-mobile.com>> 于2019年10月28日周一 下午3:18写道:
Hello everbody,

Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a KeyedDataStream? I have reviewed the testing page on Flink’s official website (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D&reserved=0>) and I am not quite sure how I could utilize these two functions in an .aggregate() operator for my testing.

Here’s how I am using the AggregateFunction (EventCountAggregate()) and ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
DataStream<Tuple2<Date, Integer>> ec2EventsAggregate =
        ec2Events
                .keyBy(t -> t.f0)
                .timeWindow(Time.minutes(30))
                .aggregate(new EventCountAggregate(), new CalculateWindowTotal())
                .name("EC2 creation interval count");


EventCountAggregate() is counting the each element in ec2Events datastream.

CalculateWindowTotal() takes the timestamp of each 30 minute window and correlates it to the number of elements that has been counted so far for the window which returns a Tuple2 containg the end timestamp and the count of elements.


Thanks,
Michael

Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

Posted by vino yang <ya...@gmail.com>.
Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider
`WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
`WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
of them call `processElementAndEnsureOutput`) as a example.

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676

Best,
Vino

Nguyen, Michael <Mi...@t-mobile.com> 于2019年10月28日周一 下午3:18写道:

> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream<Tuple2<Date, Integer>> ec2EventsAggregate =
>         ec2Events
>                 .keyBy(t -> t.f0)
>                 .timeWindow(Time.*minutes*(30))
>                 .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
>                 .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>