You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hironori Ogibayashi <og...@gmail.com> on 2016/05/27 07:33:36 UTC

Writing test for Flink streaming jobs

Hello,

I would like to write a test code for my Flink job.
Looking at flink-examples, I thought the way will be:
- Create test class which extends StreamingMultipleProgramsTestBase
- In each method, just write streaming job as usual, but use
collection data source and iterator sink
- Use TestBaseUtils.compareResultXX method to check the result.

Here is the actual code I wrote.

---
class SampleTestCase extends StreamingMultipleProgramsTestBase {

  @Test
  def testCase1(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromElements("aaa","bbb","aaa")
      .map{ x => (x,1)}
      .keyBy(0)
      .timeWindow(Time.seconds(1))
      .sum(1)

    env.execute()

    val result = DataStreamUtils.collect(stream.javaStream)
    TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)")
  }
}
---

But when I ran the test. I got this error:

java.lang.AssertionError: Wrong number of elements result expected:<2>
but was:<0>

It looks like test finishes before the end of the timeWindow, but I do
not know how to fix it.
Any advise would be appreciated.

Thanks,
Hironori Ogibayashi

Re: Writing test for Flink streaming jobs

Posted by Hironori Ogibayashi <og...@gmail.com>.
Thank you for your response.

flink-spector looks really nice. I tried but got some errors regarding
types, maybe because of
the thing Alex mentioned.
I am looking forward to the new version.

Thanks,
Hironori.

2016-05-30 16:45 GMT+09:00 lofifnc <al...@mni.thm.de>:
> Hi,
>
> Flinkspector is indeed a good choice to circumvent this problem as it
> specifically has several mechanisms to deal with these synchronization
> problems. Unfortunately, I'm still looking for a reasonable solution to
> support checking of scala types.
> Maybe I will provide a version in which you can use all the functionality
> except the custom validation logic. This could be done relatively quickly.
>
> Cheers,
> Alex
>
>
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-test-for-Flink-streaming-jobs-tp7213p7247.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Writing test for Flink streaming jobs

Posted by lofifnc <al...@mni.thm.de>.
Hi,
 
Flinkspector is indeed a good choice to circumvent this problem as it
specifically has several mechanisms to deal with these synchronization
problems. Unfortunately, I'm still looking for a reasonable solution to
support checking of scala types.
Maybe I will provide a version in which you can use all the functionality
except the custom validation logic. This could be done relatively quickly.

Cheers,
Alex









--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-test-for-Flink-streaming-jobs-tp7213p7247.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Writing test for Flink streaming jobs

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
using DataStreamUtils.collect() in a test is difficult due to
synchronization problems, as you discovered yourself.

What I propose is to write a custom Sink that collects data and verifies
the results. Verification should both happen in the invoke() method and in
close(). For the sink, you should set the parallelism to 1 to ensure that
all data goes to one sink.

Another option is to use https://github.com/ottogroup/flink-spector which
provides good ways of specifying expected outputs. Maybe Alex has something
else to say about it, I'm looping him hin.

Cheers,
Aljoscha

On Fri, 27 May 2016 at 09:33 Hironori Ogibayashi <og...@gmail.com>
wrote:

> Hello,
>
> I would like to write a test code for my Flink job.
> Looking at flink-examples, I thought the way will be:
> - Create test class which extends StreamingMultipleProgramsTestBase
> - In each method, just write streaming job as usual, but use
> collection data source and iterator sink
> - Use TestBaseUtils.compareResultXX method to check the result.
>
> Here is the actual code I wrote.
>
> ---
> class SampleTestCase extends StreamingMultipleProgramsTestBase {
>
>   @Test
>   def testCase1(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val stream = env.fromElements("aaa","bbb","aaa")
>       .map{ x => (x,1)}
>       .keyBy(0)
>       .timeWindow(Time.seconds(1))
>       .sum(1)
>
>     env.execute()
>
>     val result = DataStreamUtils.collect(stream.javaStream)
>
> TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)")
>   }
> }
> ---
>
> But when I ran the test. I got this error:
>
> java.lang.AssertionError: Wrong number of elements result expected:<2>
> but was:<0>
>
> It looks like test finishes before the end of the timeWindow, but I do
> not know how to fix it.
> Any advise would be appreciated.
>
> Thanks,
> Hironori Ogibayashi
>