You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/03/27 21:13:29 UTC

Testing RichAsyncFunction with TestHarness

Hi,
Im trying to test my RichAsyncFunction implementation with
OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2

My test setup is:
 this.processFunction = new MyRichAsyncFunction();
    this.testHarness = new OneInputStreamOperatorTestHarness<>(
        new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.ORDERED));

    this.testHarness.open();

I'm having below exception when calling  this.testHarness.open();

java.lang.NullPointerException
	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
	at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142)
	at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287)
	at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275)
	at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393)
	at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300)
	at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308)
	at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483)


I will appreciate help with this one.

Additionally even though I add all necessary dependencies defiend in [1] I
cannot see ProcessFunctionTestHarnesses class.

Thanks.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Testing RichAsyncFunction with TestHarness

Posted by KristoffSC <kr...@gmail.com>.
HI :) I have finally figured it out :)

On top of changes from last email,
in my test method, I had to wrap "testHarness.processElement" in
synchronized block, like this:

  @Test
 public void foo() throws Exception {
    synchronized (this.testHarness.getCheckpointLock()) {
      testHarness.processElement(MyMessage.builder().build(), 1L);
    }
  }

That worked. 

I think that this could be added to official documentation in [1]. 


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Testing RichAsyncFunction with TestHarness

Posted by KristoffSC <kr...@gmail.com>.
Hi, 
another update on this one. 
I managed to make the workaround a little bit cleaner. 

The test setup I have now is like this:

ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream();
    ObjectOutputStream oosStreamEdges = new
ObjectOutputStream(streamEdgesBytes);
    oosStreamEdges.writeObject(Collections.<StreamEdge>emptyList());

    KryoSerializer<MyMessage> kryoSerializer = new KryoSerializer<>(
        MyMessage.class, executionConfig);
    ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream();
    ObjectOutputStream oosKryoSerializer = new
ObjectOutputStream(kryoSerializerBytes);
    oosKryoSerializer.writeObject(kryoSerializer);

Configuration configuration = new Configuration();
    configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray());
    configuration.setBytes("typeSerializer_in_1",
kryoSerializerBytes.toByteArray());

    MockEnvironment environment = MockEnvironment.builder().build();
    ExecutionConfig executionConfig = environment.getExecutionConfig();
    environment.getTaskConfiguration().addAll(configuration);

this.testHarness = new OneInputStreamOperatorTestHarness<>(
        new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.UNORDERED), environment);

With this setup, this.testHarness.open(); works. 
However there is another problem, 
When calling:
testHarness.processElement(myMessage, 1L); 
it throws another exception:

java.lang.AssertionError
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400)
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228)
	at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112)
	at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Testing RichAsyncFunction with TestHarness

Posted by KristoffSC <kr...@gmail.com>.
I think I got this to work, although with "nasty" workaround.

I've debugged that configuration for this testHarnes operator was missing
two entries:
"edgesInOrder"
"typeSerializer_in_1"

I added conditional break points to InstantiationUtils.readObjectFromConfig
method for those two keys and I ran my "real" FlinkJob from IntelliJ.

I saw that for "edgesInOrder" an empty array of StreamEdge object was added
and for  "typeSerializer_in_1" the instance of PojoSerializer class.

I took the byte[] for those two and simply added those to arrays to my
TestHarnes setup under appropriate keys, like this:

    Configuration configuration = new Configuration();
    configuration.setBytes("edgesInOrder", emptyEdgesListBytes);
    configuration.setBytes("typeSerializer_in_1", pojoSerializerBytes);
    
    MockEnvironment environment = MockEnvironment.builder().build();
    environment.getTaskConfiguration().addAll(configuration);

Then I used this mock environment to initialize
OneInputStreamOperatorTestHarness for AsyncWaitOperator.

That seems work, but its a workaround though. 






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Testing RichAsyncFunction with TestHarness

Posted by KristoffSC <kr...@gmail.com>.
I've debug it a little bit and I found that it fails in
InstantiationUtil.readObjectFromConfig method when we execute
byte[] bytes = config.getBytes(key, (byte[])null);  This returns null.

The key that it is looking for is "edgesInOrder". In the config map, there
are only two entries though. 
For "checkpointing -> {Boolean@6347} true" and "operatorID ->
{byte[16]@6351} "




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Testing RichAsyncFunction with TestHarness

Posted by KristoffSC <kr...@gmail.com>.
Thanks, 
I would suggest adding my "tutorial" about using testHarnes for
AsynOperators, to the documentation. Or maybe build something based on this
use case, that could be helpful for others in the future :)

Thanks, 
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Testing RichAsyncFunction with TestHarness

Posted by Gary Yao <ga...@apache.org>.
>
> Additionally even though I add all necessary dependencies defiend in [1] I
> cannot see ProcessFunctionTestHarnesses class.
>

That class was added in Flink 1.10 [1].

[1]
https://github.com/apache/flink/blame/f765ad09ae2b2aa478c887b988e11e92a8b730bd/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ProcessFunctionTestHarnesses.java

On Fri, Mar 27, 2020 at 10:13 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi,
> Im trying to test my RichAsyncFunction implementation with
> OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2
>
> My test setup is:
>  this.processFunction = new MyRichAsyncFunction();
>     this.testHarness = new OneInputStreamOperatorTestHarness<>(
>         new AsyncWaitOperator<>(processFunction, 2000, 1,
> OutputMode.ORDERED));
>
>     this.testHarness.open();
>
> I'm having below exception when calling  this.testHarness.open();
>
> java.lang.NullPointerException
>         at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>         at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
>         at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142)
>         at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287)
>         at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275)
>         at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393)
>         at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300)
>         at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308)
>         at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483)
>
>
> I will appreciate help with this one.
>
> Additionally even though I add all necessary dependencies defiend in [1] I
> cannot see ProcessFunctionTestHarnesses class.
>
> Thanks.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>