You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arpith techy <ar...@gmail.com> on 2021/02/16 10:32:52 UTC

Unit testing Async Operator

Hi,

I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
Output but while creating a test harness I couldn't find the right
TupleSerializer. Can anyone help me on this?


public class GetMetadataAsyncProcess extends
RichAsyncFunction<Tuple1<Map<String, List<String>>>,
Tuple3<Map<String, List<String>>, Map<String, String>,
List<Map<String, Integer>>>> {

...

}


private OneInputStreamOperatorTestHarness
createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout,
int capacity, AsyncDataStream.OutputMode mode) throws Exception {
   OneInputStreamOperatorTestHarness
tuple1OUTOneInputStreamOperatorTestHarness = new
OneInputStreamOperatorTestHarness<>
      (new AsyncWaitOperator<>(asyncFunction, timeout, capacity,
mode), <TYPESERIALISER_TO_FILL?>);
   return tuple1OUTOneInputStreamOperatorTestHarness;
}


Creating a harness without passing TypeSerializer results in the following
error.

java.lang.NullPointerException
	at java.util.Objects.requireNonNull(Objects.java:203)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
	at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)

Re: Unit testing Async Operator

Posted by Arpith Prakash <ar...@gmail.com>.
Thanks Till, your solution worked perfectly.

Arpith

On Wed, Feb 17, 2021 at 12:53 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Arpith,
>
> The operator test harness is more meant for use cases where you implement
> your own operator (quite advanced).
>
> If you just want to test your AsyncFunction, I'd strongly recommend
> building a small ITCase like [1] and then you don't have to fiddle with
> these things anymore. The tests run very fast and test much more than the
> operator harness, which just simulates the execution somewhat.
>
> [1]
> https://github.com/apache/flink/blob/c202f5134a17dd652eb072d00d5fca894936cdaf/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L41-L41
>
> On Tue, Feb 16, 2021 at 1:47 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Arpith,
>>
>> looking at the definition of the GetMetadataAsyncProcess function you
>> need to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>.
>> What you could try in order to not create the serializer manually is to use:
>>
>> TypeInformation.of(new TypeHint<Tuple1<Map<String,
>> List<String>>>>(){}).createSerializer(new ExecutionConfig())
>>
>> This should hopefully create the correct serializer.
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <ar...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
>>> Output but while creating a test harness I couldn't find the right
>>> TupleSerializer. Can anyone help me on this?
>>>
>>>
>>> public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>>>
>>> ...
>>>
>>> }
>>>
>>>
>>> private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
>>>    OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
>>>       (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
>>>    return tuple1OUTOneInputStreamOperatorTestHarness;
>>> }
>>>
>>>
>>> Creating a harness without passing TypeSerializer results in the
>>> following error.
>>>
>>> java.lang.NullPointerException
>>> 	at java.util.Objects.requireNonNull(Objects.java:203)
>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
>>> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
>>> 	at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
>>> 	
>>>

Re: Unit testing Async Operator

Posted by Arvid Heise <ar...@apache.org>.
Hi Arpith,

The operator test harness is more meant for use cases where you implement
your own operator (quite advanced).

If you just want to test your AsyncFunction, I'd strongly recommend
building a small ITCase like [1] and then you don't have to fiddle with
these things anymore. The tests run very fast and test much more than the
operator harness, which just simulates the execution somewhat.

[1]
https://github.com/apache/flink/blob/c202f5134a17dd652eb072d00d5fca894936cdaf/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L41-L41

On Tue, Feb 16, 2021 at 1:47 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Arpith,
>
> looking at the definition of the GetMetadataAsyncProcess function you need
> to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What
> you could try in order to not create the serializer manually is to use:
>
> TypeInformation.of(new TypeHint<Tuple1<Map<String,
> List<String>>>>(){}).createSerializer(new ExecutionConfig())
>
> This should hopefully create the correct serializer.
>
> Cheers,
> Till
>
>
>
> On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <ar...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
>> Output but while creating a test harness I couldn't find the right
>> TupleSerializer. Can anyone help me on this?
>>
>>
>> public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>>
>> ...
>>
>> }
>>
>>
>> private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
>>    OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
>>       (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
>>    return tuple1OUTOneInputStreamOperatorTestHarness;
>> }
>>
>>
>> Creating a harness without passing TypeSerializer results in the
>> following error.
>>
>> java.lang.NullPointerException
>> 	at java.util.Objects.requireNonNull(Objects.java:203)
>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
>> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
>> 	at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
>> 	
>>

Re: Unit testing Async Operator

Posted by Till Rohrmann <tr...@apache.org>.
Hi Arpith,

looking at the definition of the GetMetadataAsyncProcess function you need
to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What
you could try in order to not create the serializer manually is to use:

TypeInformation.of(new TypeHint<Tuple1<Map<String,
List<String>>>>(){}).createSerializer(new ExecutionConfig())

This should hopefully create the correct serializer.

Cheers,
Till



On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <ar...@gmail.com> wrote:

> Hi,
>
> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
> Output but while creating a test harness I couldn't find the right
> TupleSerializer. Can anyone help me on this?
>
>
> public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>
> ...
>
> }
>
>
> private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
>    OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
>       (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
>    return tuple1OUTOneInputStreamOperatorTestHarness;
> }
>
>
> Creating a harness without passing TypeSerializer results in the following
> error.
>
> java.lang.NullPointerException
> 	at java.util.Objects.requireNonNull(Objects.java:203)
> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
> 	at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
> 	
>