You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/05/15 10:49:59 UTC

Testing process functions

Hi,
How do I test process functions? I tried by implementing a sink function
that stores myProcessFunction's output in a list. After env.execute(), I
use assertions.
If I set a breakpoint in the myTestSink's invoke() method, I see that that
method is being called correctly. However, after env.execute() returns, all
data in sink functions is wiped clean.

TestSink myTestSink = new myTestSink();
testStream.process(new myProcessFunction()).addSink(myTestSink);
env.execute("test");
assertEquals(expectedOutput, myTestSink.actual);

What am I doing wrong?
 Also, I see that a ProcessFunctionTestHarnesses has been added in 1.10. I
wasn't able to download its sources to understand how I could use that.
Have the sources not been added to maven or is it a problem at my end?

Regards,
Manas

Re: Testing process functions

Posted by Manas Kale <ma...@gmail.com>.
Thank you for the example, Alexander.

On Wed, May 20, 2020 at 6:48 PM Alexander Fedulov <al...@ververica.com>
wrote:

> Hi Manas,
>
> I would recommend using TestHarnesses for testing. You could also use them
> prior to 1.10. Here is an example of setting the dependencies:
>
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113
>
> You can see some examples of tests for a demo application here:
>
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java
> Hope this helps.
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Mon, May 18, 2020 at 1:18 PM Manas Kale <ma...@gmail.com> wrote:
>
>> I see, I had not considered the serialization; that was the issue.
>> Thank you.
>>
>> On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> We don't publish sources for test classes.
>>>
>>> Have you considered that the sink will be serialized on job submission,
>>> meaning that your myTestSink instance is not the one actually used by
>>> the job? This typically means that have to store stuff in a static field
>>> instead.
>>> Alternatively, depending on the number of elements
>>> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
>>> be worth a try.
>>>
>>> On 15/05/2020 12:49, Manas Kale wrote:
>>> > Hi,
>>> > How do I test process functions? I tried by implementing a sink
>>> > function that stores myProcessFunction's output in a list. After
>>> > env.execute(), I use assertions.
>>> > If I set a breakpoint in the myTestSink's invoke() method, I see that
>>> > that method is being called correctly. However, after env.execute()
>>> > returns, all data in sink functions is wiped clean.
>>> >
>>> > TestSink myTestSink = new myTestSink();
>>> > testStream.process(new myProcessFunction()).addSink(myTestSink);
>>> > env.execute("test");
>>> > assertEquals(expectedOutput, myTestSink.actual);
>>> >
>>> > What am I doing wrong?
>>> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
>>> > 1.10. I wasn't able to download its sources to understand how I could
>>> > use that. Have the sources not been added to maven or is it a problem
>>> > at my end?
>>> >
>>> > Regards,
>>> > Manas
>>>
>>>
>>>

Re: Testing process functions

Posted by Alexander Fedulov <al...@ververica.com>.
Hi Manas,

I would recommend using TestHarnesses for testing. You could also use them
prior to 1.10. Here is an example of setting the dependencies:
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113

You can see some examples of tests for a demo application here:
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java
Hope this helps.

Best regards,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Mon, May 18, 2020 at 1:18 PM Manas Kale <ma...@gmail.com> wrote:

> I see, I had not considered the serialization; that was the issue.
> Thank you.
>
> On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> We don't publish sources for test classes.
>>
>> Have you considered that the sink will be serialized on job submission,
>> meaning that your myTestSink instance is not the one actually used by
>> the job? This typically means that have to store stuff in a static field
>> instead.
>> Alternatively, depending on the number of elements
>> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
>> be worth a try.
>>
>> On 15/05/2020 12:49, Manas Kale wrote:
>> > Hi,
>> > How do I test process functions? I tried by implementing a sink
>> > function that stores myProcessFunction's output in a list. After
>> > env.execute(), I use assertions.
>> > If I set a breakpoint in the myTestSink's invoke() method, I see that
>> > that method is being called correctly. However, after env.execute()
>> > returns, all data in sink functions is wiped clean.
>> >
>> > TestSink myTestSink = new myTestSink();
>> > testStream.process(new myProcessFunction()).addSink(myTestSink);
>> > env.execute("test");
>> > assertEquals(expectedOutput, myTestSink.actual);
>> >
>> > What am I doing wrong?
>> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
>> > 1.10. I wasn't able to download its sources to understand how I could
>> > use that. Have the sources not been added to maven or is it a problem
>> > at my end?
>> >
>> > Regards,
>> > Manas
>>
>>
>>

Re: Testing process functions

Posted by Manas Kale <ma...@gmail.com>.
I see, I had not considered the serialization; that was the issue.
Thank you.

On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler <ch...@apache.org>
wrote:

> We don't publish sources for test classes.
>
> Have you considered that the sink will be serialized on job submission,
> meaning that your myTestSink instance is not the one actually used by
> the job? This typically means that have to store stuff in a static field
> instead.
> Alternatively, depending on the number of elements
> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
> be worth a try.
>
> On 15/05/2020 12:49, Manas Kale wrote:
> > Hi,
> > How do I test process functions? I tried by implementing a sink
> > function that stores myProcessFunction's output in a list. After
> > env.execute(), I use assertions.
> > If I set a breakpoint in the myTestSink's invoke() method, I see that
> > that method is being called correctly. However, after env.execute()
> > returns, all data in sink functions is wiped clean.
> >
> > TestSink myTestSink = new myTestSink();
> > testStream.process(new myProcessFunction()).addSink(myTestSink);
> > env.execute("test");
> > assertEquals(expectedOutput, myTestSink.actual);
> >
> > What am I doing wrong?
> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
> > 1.10. I wasn't able to download its sources to understand how I could
> > use that. Have the sources not been added to maven or is it a problem
> > at my end?
> >
> > Regards,
> > Manas
>
>
>

Re: Testing process functions

Posted by Chesnay Schepler <ch...@apache.org>.
We don't publish sources for test classes.

Have you considered that the sink will be serialized on job submission, 
meaning that your myTestSink instance is not the one actually used by 
the job? This typically means that have to store stuff in a static field 
instead.
Alternatively, depending on the number of elements 
org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might 
be worth a try.

On 15/05/2020 12:49, Manas Kale wrote:
> Hi,
> How do I test process functions? I tried by implementing a sink 
> function that stores myProcessFunction's output in a list. After 
> env.execute(), I use assertions.
> If I set a breakpoint in the myTestSink's invoke() method, I see that 
> that method is being called correctly. However, after env.execute() 
> returns, all data in sink functions is wiped clean.
>
> TestSink myTestSink = new myTestSink();
> testStream.process(new myProcessFunction()).addSink(myTestSink);
> env.execute("test");
> assertEquals(expectedOutput, myTestSink.actual);
>
> What am I doing wrong?
>  Also, I see that a ProcessFunctionTestHarnesses has been added in 
> 1.10. I wasn't able to download its sources to understand how I could 
> use that. Have the sources not been added to maven or is it a problem 
> at my end?
>
> Regards,
> Manas