You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lars Skjærven <la...@gmail.com> on 2021/12/10 14:46:17 UTC

WindowOperator TestHarness

Hello,

We're trying to write a test for an implementation of *AggregateFunction*
following a *EventTimeSessionWindows.withGap*. We gave it a try using
*WindowOperator*() which we hoped could be used as an argument to
*KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're
hoping someone has a tip or two. Specifically, we can't find the right
*InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator*
is our implementation of the
*AggregateFunction. *

Does anyone have a template, or guide, to test a windowed aggregate
function?

Kind regards,
Lars


    val myWindowOperator = new WindowOperator(
      EventTimeSessionWindows.withGap(Time.seconds(10)),
      new TimeWindow.Serializer(),
      new KeySelector[MyInputType, (String, String)] {
        override def getKey(value: MyInputType): (String, String) = {
          (value.a, value.b)
        }
      },
      Types.TUPLE(Types.STRING).createSerializer(
        new ExecutionConfig
      ),
      new AggregatingStateDescriptor[MyInputType, MyAggregateState,
MyOutputType](
        "test", new MyAggregator, classOf[MyAggregateState],
      ),
      ???,
      EventTimeTrigger.create(),
      0,
      null
    )

    testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
String), MyInputType, MyOutputType](
      myWindowOperator,
      new KeySelector[MyInputType, (String, String)] {
        override def getKey(value: MyInputType): (String, String) = {
          (value.a, value.b)
        }
      },
      createTuple2TypeInformation(Types.STRING, Types.STRING)
    )

Svar: Svar: WindowOperator TestHarness

Posted by Pierre Bedoucha <Pi...@tv2.no>.
Hi Timo,

It helps a lot, thank you. Following your advice, we ended up implementing the test the following way:
```

val transform: OneInputTransformation[MyInputType, MyOutputType] = window1.getTransformation

  .asInstanceOf[OneInputTransformation[MyInputType, MyOutputType]]



val operator: OneInputStreamOperator[MyInputType, MyOutputType] = transform.getOperator



val winOperator: WindowOperator[(String, String), MyInputType, AggregateState, MyOutputType, TimeWindow] =

  operator.asInstanceOf[WindowOperator[(String, String), MyInputType, AggregateState, MyOutputType, TimeWindow]]



testHarness =

  new KeyedOneInputStreamOperatorTestHarness(winOperator,

    winOperator.getKeySelector, TypeInformation.of(new TypeHint[(String, String)]() {}))

```

With `window1` from the windowed aggregated datastream, and with corresponding Java DataStream API.

Regards,
Pierre


Fra: Timo Walther <tw...@apache.org>
Dato: fredag, 17. desember 2021 kl. 14:20
Til: user@flink.apache.org <us...@flink.apache.org>
Emne: Re: Svar: WindowOperator TestHarness
Hi Pierre,

sorry, for the late reply.

The `getTransformation` might only be available in the Java DataStream
API. The Scala `DataStream` object is only a think wrapper around the
Java one. You can access the origin via:

dataStream.javaStream.getTransformation()

I hope this helps.

Regards,
Timo


On 16.12.21 09:58, Pierre Bedoucha wrote:
> Hi Timo,
>
> And thank you for the detailed answer.
>
> We chose to go for the second alternative using the following:
>
> import org.apache.flink.streaming.api.transformations.OneInputTransformation
>
> import
> org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val source = env.fromElements(new MyInputType(..., eventTime =
> Some(Timestamp(1595447118L))),
>
>   new MyInputType(..., eventTime = Some(Timestamp(1595447119L))))
>
> val window1 : DataStream[MyOutputType] = source.keyBy[(String, String)](
>
>   (v: MyInputType) => (
>
>     v.a, v.b,
>
>   )
>
> )
>
>   .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
> Time.of(100, TimeUnit.MILLISECONDS)))
>
>   .aggregate(new MyAggregator())
>
> val transform : OneInputTransformation[MyInputType, MyOutputType] =
> window1.getTransformation
>
> val operator = transform.getOperator
>
> However the **.getTransformation** method seems to not be exposed for
> the windowed and aggregated DataStream. We´re using Flink 1.13.2 so far,
> could it be due to public test API exposition?
>
> Kind regards,
>
> Pierre and Lars
>
> *Fra: *Timo Walther <tw...@apache.org>
> *Dato: *mandag, 13. desember 2021 kl. 08:53
> *Til: *user@flink.apache.org <us...@flink.apache.org>
> *Emne: *Re: WindowOperator TestHarness
>
> Hi Lars,
>
> you can take a look at how
> org.apache.flink.streaming.api.datastream.WindowedStream#WindowedStream
> constructs the graph under the hood. In particular, it uses
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder
>
> which constructs the InternalWindowFunction you are looking for.
>
> You could also think about using regular DataStream API to construct the
> operator. And access it for the test harness via something like
> dataStreamn.getTransformation().getOperator(). This avoid calling too
> many of the internal classes.
>
> I hope this helps.
>
> Timo
>
>
> On 10.12.21 15:46, Lars Skjærven wrote:
>> Hello,
>>
>> We're trying to write a test for an implementation of
>> *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We
>> gave it a try using *WindowOperator*() which we hoped could be used as
>> an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit
>> stuck, and we're hoping someone has a tip or two. Specifically, we can't
>> find the right *InternalWindowFunction* to pass to WindowOperator().
>> Below, *MyAggregator* is our implementation of the *AggregateFunction.
>> *
>> *
>> *
>> Does anyone have a template, or guide, to test a windowed aggregate
>> function?*
>> *
>> *
>> *
>> Kind regards,
>> Lars
>>
>>
>>      val myWindowOperator = new WindowOperator(
>>        EventTimeSessionWindows.withGap(Time.seconds(10)),
>>        new TimeWindow.Serializer(),
>>        new KeySelector[MyInputType, (String, String)] {
>>          override def getKey(value: MyInputType): (String, String) = {
>>            (value.a, value.b)
>>          }
>>        },
>>        Types.TUPLE(Types.STRING).createSerializer(
>>          new ExecutionConfig
>>        ),
>>        new AggregatingStateDescriptor[MyInputType, MyAggregateState,
>> MyOutputType](
>>          "test", new MyAggregator, classOf[MyAggregateState],
>>        ),
>>        ???,
>>        EventTimeTrigger.create(),
>>        0,
>>        null
>>      )
>>
>>      testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
>> String), MyInputType, MyOutputType](
>> myWindowOperator,
>>        new KeySelector[MyInputType, (String, String)] {
>>          override def getKey(value: MyInputType): (String, String) = {
>>            (value.a, value.b)
>>          }
>>        },
>>        createTuple2TypeInformation(Types.STRING, Types.STRING)
>>      )
>>
>>
>>
>

Re: Svar: WindowOperator TestHarness

Posted by Timo Walther <tw...@apache.org>.
Hi Pierre,

sorry, for the late reply.

The `getTransformation` might only be available in the Java DataStream 
API. The Scala `DataStream` object is only a think wrapper around the 
Java one. You can access the origin via:

dataStream.javaStream.getTransformation()

I hope this helps.

Regards,
Timo


On 16.12.21 09:58, Pierre Bedoucha wrote:
> Hi Timo,
> 
> And thank you for the detailed answer.
> 
> We chose to go for the second alternative using the following:
> 
> import org.apache.flink.streaming.api.transformations.OneInputTransformation
> 
> import 
> org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
> val source = env.fromElements(new MyInputType(..., eventTime = 
> Some(Timestamp(1595447118L))),
> 
>   new MyInputType(..., eventTime = Some(Timestamp(1595447119L))))
> 
> val window1 : DataStream[MyOutputType] = source.keyBy[(String, String)](
> 
>   (v: MyInputType) => (
> 
>     v.a, v.b,
> 
>   )
> 
> )
> 
>   .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
> Time.of(100, TimeUnit.MILLISECONDS)))
> 
>   .aggregate(new MyAggregator())
> 
> val transform : OneInputTransformation[MyInputType, MyOutputType] = 
> window1.getTransformation
> 
> val operator = transform.getOperator
> 
> However the **.getTransformation** method seems to not be exposed for 
> the windowed and aggregated DataStream. We´re using Flink 1.13.2 so far, 
> could it be due to public test API exposition?
> 
> Kind regards,
> 
> Pierre and Lars
> 
> *Fra: *Timo Walther <tw...@apache.org>
> *Dato: *mandag, 13. desember 2021 kl. 08:53
> *Til: *user@flink.apache.org <us...@flink.apache.org>
> *Emne: *Re: WindowOperator TestHarness
> 
> Hi Lars,
> 
> you can take a look at how
> org.apache.flink.streaming.api.datastream.WindowedStream#WindowedStream
> constructs the graph under the hood. In particular, it uses
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder 
> 
> which constructs the InternalWindowFunction you are looking for.
> 
> You could also think about using regular DataStream API to construct the
> operator. And access it for the test harness via something like
> dataStreamn.getTransformation().getOperator(). This avoid calling too
> many of the internal classes.
> 
> I hope this helps.
> 
> Timo
> 
> 
> On 10.12.21 15:46, Lars Skjærven wrote:
>> Hello,
>> 
>> We're trying to write a test for an implementation of 
>> *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We 
>> gave it a try using *WindowOperator*() which we hoped could be used as 
>> an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit 
>> stuck, and we're hoping someone has a tip or two. Specifically, we can't 
>> find the right *InternalWindowFunction* to pass to WindowOperator(). 
>> Below, *MyAggregator* is our implementation of the *AggregateFunction.
>> *
>> *
>> *
>> Does anyone have a template, or guide, to test a windowed aggregate 
>> function?*
>> *
>> *
>> *
>> Kind regards,
>> Lars
>> 
>> 
>>      val myWindowOperator = new WindowOperator(
>>        EventTimeSessionWindows.withGap(Time.seconds(10)),
>>        new TimeWindow.Serializer(),
>>        new KeySelector[MyInputType, (String, String)] {
>>          override def getKey(value: MyInputType): (String, String) = {
>>            (value.a, value.b)
>>          }
>>        },
>>        Types.TUPLE(Types.STRING).createSerializer(
>>          new ExecutionConfig
>>        ),
>>        new AggregatingStateDescriptor[MyInputType, MyAggregateState, 
>> MyOutputType](
>>          "test", new MyAggregator, classOf[MyAggregateState],
>>        ),
>>        ???,
>>        EventTimeTrigger.create(),
>>        0,
>>        null
>>      )
>> 
>>      testHarness = new KeyedOneInputStreamOperatorTestHarness[(String, 
>> String), MyInputType, MyOutputType](
>> myWindowOperator,
>>        new KeySelector[MyInputType, (String, String)] {
>>          override def getKey(value: MyInputType): (String, String) = {
>>            (value.a, value.b)
>>          }
>>        },
>>        createTuple2TypeInformation(Types.STRING, Types.STRING)
>>      )
>> 
>> 
>> 
> 


Svar: WindowOperator TestHarness

Posted by Pierre Bedoucha <Pi...@tv2.no>.
Hi Timo,

And thank you for the detailed answer.
We chose to go for the second alternative using the following:

import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
val env = StreamExecutionEnvironment.getExecutionEnvironment

val source = env.fromElements(new MyInputType(..., eventTime = Some(Timestamp(1595447118L))),
 new MyInputType(..., eventTime = Some(Timestamp(1595447119L))))

val window1 : DataStream[MyOutputType] = source.keyBy[(String, String)](
 (v: MyInputType) => (
   v.a, v.b,
 )
)
 .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 .aggregate(new MyAggregator())

val transform : OneInputTransformation[MyInputType, MyOutputType] = window1.getTransformation

val operator = transform.getOperator


However the *.getTransformation* method seems to not be exposed for the windowed and aggregated DataStream. We´re using Flink 1.13.2 so far, could it be due to public test API exposition?

Kind regards,
Pierre and Lars


Fra: Timo Walther <tw...@apache.org>
Dato: mandag, 13. desember 2021 kl. 08:53
Til: user@flink.apache.org <us...@flink.apache.org>
Emne: Re: WindowOperator TestHarness
Hi Lars,

you can take a look at how
org.apache.flink.streaming.api.datastream.WindowedStream#WindowedStream
constructs the graph under the hood. In particular, it uses
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder
which constructs the InternalWindowFunction you are looking for.

You could also think about using regular DataStream API to construct the
operator. And access it for the test harness via something like
dataStreamn.getTransformation().getOperator(). This avoid calling too
many of the internal classes.

I hope this helps.

Timo


On 10.12.21 15:46, Lars Skjærven wrote:
> Hello,
>
> We're trying to write a test for an implementation of
> *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We
> gave it a try using *WindowOperator*() which we hoped could be used as
> an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit
> stuck, and we're hoping someone has a tip or two. Specifically, we can't
> find the right *InternalWindowFunction* to pass to WindowOperator().
> Below, *MyAggregator* is our implementation of the *AggregateFunction.
> *
> *
> *
> Does anyone have a template, or guide, to test a windowed aggregate
> function?*
> *
> *
> *
> Kind regards,
> Lars
>
>
>      val myWindowOperator = new WindowOperator(
>        EventTimeSessionWindows.withGap(Time.seconds(10)),
>        new TimeWindow.Serializer(),
>        new KeySelector[MyInputType, (String, String)] {
>          override def getKey(value: MyInputType): (String, String) = {
>            (value.a, value.b)
>          }
>        },
>        Types.TUPLE(Types.STRING).createSerializer(
>          new ExecutionConfig
>        ),
>        new AggregatingStateDescriptor[MyInputType, MyAggregateState,
> MyOutputType](
>          "test", new MyAggregator, classOf[MyAggregateState],
>        ),
>        ???,
>        EventTimeTrigger.create(),
>        0,
>        null
>      )
>
>      testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
> String), MyInputType, MyOutputType](
> myWindowOperator,
>        new KeySelector[MyInputType, (String, String)] {
>          override def getKey(value: MyInputType): (String, String) = {
>            (value.a, value.b)
>          }
>        },
>        createTuple2TypeInformation(Types.STRING, Types.STRING)
>      )
>
>
>

Re: WindowOperator TestHarness

Posted by Timo Walther <tw...@apache.org>.
Hi Lars,

you can take a look at how 
org.apache.flink.streaming.api.datastream.WindowedStream#WindowedStream 
constructs the graph under the hood. In particular, it uses 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder 
which constructs the InternalWindowFunction you are looking for.

You could also think about using regular DataStream API to construct the 
operator. And access it for the test harness via something like 
dataStreamn.getTransformation().getOperator(). This avoid calling too 
many of the internal classes.

I hope this helps.

Timo


On 10.12.21 15:46, Lars Skjærven wrote:
> Hello,
> 
> We're trying to write a test for an implementation of 
> *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We 
> gave it a try using *WindowOperator*() which we hoped could be used as 
> an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit 
> stuck, and we're hoping someone has a tip or two. Specifically, we can't 
> find the right *InternalWindowFunction* to pass to WindowOperator(). 
> Below, *MyAggregator* is our implementation of the *AggregateFunction.
> *
> *
> *
> Does anyone have a template, or guide, to test a windowed aggregate 
> function?*
> *
> *
> *
> Kind regards,
> Lars
> 
> 
>      val myWindowOperator = new WindowOperator(
>        EventTimeSessionWindows.withGap(Time.seconds(10)),
>        new TimeWindow.Serializer(),
>        new KeySelector[MyInputType, (String, String)] {
>          override def getKey(value: MyInputType): (String, String) = {
>            (value.a, value.b)
>          }
>        },
>        Types.TUPLE(Types.STRING).createSerializer(
>          new ExecutionConfig
>        ),
>        new AggregatingStateDescriptor[MyInputType, MyAggregateState, 
> MyOutputType](
>          "test", new MyAggregator, classOf[MyAggregateState],
>        ),
>        ???,
>        EventTimeTrigger.create(),
>        0,
>        null
>      )
> 
>      testHarness = new KeyedOneInputStreamOperatorTestHarness[(String, 
> String), MyInputType, MyOutputType](
> myWindowOperator,
>        new KeySelector[MyInputType, (String, String)] {
>          override def getKey(value: MyInputType): (String, String) = {
>            (value.a, value.b)
>          }
>        },
>        createTuple2TypeInformation(Types.STRING, Types.STRING)
>      )
> 
> 
>