You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arpith techy <ar...@gmail.com> on 2021/02/16 10:32:52 UTC
Unit testing Async Operator
Hi,
I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
Output but while creating a test harness I couldn't find the right
TupleSerializer. Can anyone help me on this?
public class GetMetadataAsyncProcess extends
RichAsyncFunction<Tuple1<Map<String, List<String>>>,
Tuple3<Map<String, List<String>>, Map<String, String>,
List<Map<String, Integer>>>> {
...
}
private OneInputStreamOperatorTestHarness
createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout,
int capacity, AsyncDataStream.OutputMode mode) throws Exception {
OneInputStreamOperatorTestHarness
tuple1OUTOneInputStreamOperatorTestHarness = new
OneInputStreamOperatorTestHarness<>
(new AsyncWaitOperator<>(asyncFunction, timeout, capacity,
mode), <TYPESERIALISER_TO_FILL?>);
return tuple1OUTOneInputStreamOperatorTestHarness;
}
Creating a harness without passing TypeSerializer results in the following
error.
java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
Re: Unit testing Async Operator
Posted by Arpith Prakash <ar...@gmail.com>.
Thanks Till, your solution worked perfectly.
Arpith
On Wed, Feb 17, 2021 at 12:53 AM Arvid Heise <ar...@apache.org> wrote:
> Hi Arpith,
>
> The operator test harness is more meant for use cases where you implement
> your own operator (quite advanced).
>
> If you just want to test your AsyncFunction, I'd strongly recommend
> building a small ITCase like [1] and then you don't have to fiddle with
> these things anymore. The tests run very fast and test much more than the
> operator harness, which just simulates the execution somewhat.
>
> [1]
> https://github.com/apache/flink/blob/c202f5134a17dd652eb072d00d5fca894936cdaf/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L41-L41
>
> On Tue, Feb 16, 2021 at 1:47 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Arpith,
>>
>> looking at the definition of the GetMetadataAsyncProcess function you
>> need to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>.
>> What you could try in order to not create the serializer manually is to use:
>>
>> TypeInformation.of(new TypeHint<Tuple1<Map<String,
>> List<String>>>>(){}).createSerializer(new ExecutionConfig())
>>
>> This should hopefully create the correct serializer.
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <ar...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
>>> Output but while creating a test harness I couldn't find the right
>>> TupleSerializer. Can anyone help me on this?
>>>
>>>
>>> public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>>>
>>> ...
>>>
>>> }
>>>
>>>
>>> private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
>>> OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
>>> (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
>>> return tuple1OUTOneInputStreamOperatorTestHarness;
>>> }
>>>
>>>
>>> Creating a harness without passing TypeSerializer results in the
>>> following error.
>>>
>>> java.lang.NullPointerException
>>> at java.util.Objects.requireNonNull(Objects.java:203)
>>> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
>>> at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
>>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
>>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
>>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
>>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
>>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
>>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
>>> at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
>>>
>>>
Re: Unit testing Async Operator
Posted by Arvid Heise <ar...@apache.org>.
Hi Arpith,
The operator test harness is more meant for use cases where you implement
your own operator (quite advanced).
If you just want to test your AsyncFunction, I'd strongly recommend
building a small ITCase like [1] and then you don't have to fiddle with
these things anymore. The tests run very fast and test much more than the
operator harness, which just simulates the execution somewhat.
[1]
https://github.com/apache/flink/blob/c202f5134a17dd652eb072d00d5fca894936cdaf/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L41-L41
On Tue, Feb 16, 2021 at 1:47 PM Till Rohrmann <tr...@apache.org> wrote:
> Hi Arpith,
>
> looking at the definition of the GetMetadataAsyncProcess function you need
> to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What
> you could try in order to not create the serializer manually is to use:
>
> TypeInformation.of(new TypeHint<Tuple1<Map<String,
> List<String>>>>(){}).createSerializer(new ExecutionConfig())
>
> This should hopefully create the correct serializer.
>
> Cheers,
> Till
>
>
>
> On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <ar...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
>> Output but while creating a test harness I couldn't find the right
>> TupleSerializer. Can anyone help me on this?
>>
>>
>> public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>>
>> ...
>>
>> }
>>
>>
>> private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
>> OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
>> (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
>> return tuple1OUTOneInputStreamOperatorTestHarness;
>> }
>>
>>
>> Creating a harness without passing TypeSerializer results in the
>> following error.
>>
>> java.lang.NullPointerException
>> at java.util.Objects.requireNonNull(Objects.java:203)
>> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
>> at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
>> at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
>>
>>
Re: Unit testing Async Operator
Posted by Till Rohrmann <tr...@apache.org>.
Hi Arpith,
looking at the definition of the GetMetadataAsyncProcess function you need
to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What
you could try in order to not create the serializer manually is to use:
TypeInformation.of(new TypeHint<Tuple1<Map<String,
List<String>>>>(){}).createSerializer(new ExecutionConfig())
This should hopefully create the correct serializer.
Cheers,
Till
On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <ar...@gmail.com> wrote:
> Hi,
>
> I tried mocking the Async operator which takes Tuple1, Tuple3 as Input &
> Output but while creating a test harness I couldn't find the right
> TupleSerializer. Can anyone help me on this?
>
>
> public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
>
> ...
>
> }
>
>
> private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
> OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
> (new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
> return tuple1OUTOneInputStreamOperatorTestHarness;
> }
>
>
> Creating a harness without passing TypeSerializer results in the following
> error.
>
> java.lang.NullPointerException
> at java.util.Objects.requireNonNull(Objects.java:203)
> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
> at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
> at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
>
>