You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/05/17 14:55:29 UTC

[flink] 01/02: [FLINK-12508] [docs] extend section on unit testing Flink operators

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f3a76944f35f012244dd45cc425c21d5af390f7
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Mon May 13 20:18:32 2019 +0200

    [FLINK-12508] [docs] extend section on unit testing Flink operators
---
 docs/dev/stream/testing.md | 276 +++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 256 insertions(+), 20 deletions(-)

diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
index 7676b19..4944cd9 100644
--- a/docs/dev/stream/testing.md
+++ b/docs/dev/stream/testing.md
@@ -23,25 +23,28 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a local environment.
+Testing is an integral part of every software development process as such Apache Flink comes with tooling to test your application code on multiple levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible.
+Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class SumReduce implements ReduceFunction<Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
 
     @Override
-    public Long reduce(Long value1, Long value2) throws Exception {
-        return value1 + value2;
+    public Long map(Long record) throws Exception {
+        return record +1 ;
     }
 }
 {% endhighlight %}
@@ -49,30 +52,30 @@ public class SumReduce implements ReduceFunction<Long> {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-    override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
-        value1 + value2
+    override def map(record: Long): Long = {
+        record + 1
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
     @Test
-    public void testSum() throws Exception {
+    public void testIncrement() throws Exception {
         // instantiate your function
-        SumReduce sumReduce = new SumReduce();
+        IncrementMapFunction incrementer = new IncrementMapFunction();
 
         // call the methods that you have implemented
-        assertEquals(42L, sumReduce.reduce(40L, 2L));
+        assertEquals(3L, incrementer.map(2L));
     }
 }
 {% endhighlight %}
@@ -80,21 +83,254 @@ public class SumReduceTest {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
 
-    "SumReduce" should "add values" in {
+    "IncrementMapFunction" should "increment values" in {
         // instantiate your function
-        val sumReduce: SumReduce = new SumReduce()
+        val incrementer: IncrementMapFunction = new IncrementMapFunction()
 
         // call the methods that you have implemented
-        sumReduce.reduce(40L, 2L) should be (42L)
+        incremeter.map(2) should be (3)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector.  A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
+
+    @Test
+    public void testIncrement() throws Exception {
+        // instantiate your function
+        IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+        Collector<Integer> collector = mock(Collector.class);
+
+        // call the methods that you have implemented
+        incrementer.flatMap(2L, collector)
+
+        //verify collector was called with the right output
+        Mockito.verify(collector, times(1)).collect(3L);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+    "IncrementFlatMapFunction" should "increment values" in {
+       // instantiate your function
+      val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()
+
+      val collector = mock[Collector[Integer]]
+
+      //verify collector was called with the right output
+      (collector.collect _).expects(3)
+
+      // call the methods that you have implemented
+      flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Unit Testing Stateful or Timely UDFs & Custom Operators
+
+Testing the functionality of a user-defined function, which makes use of managed state or timers is more difficult because it involves testing the interaction between the user code and Flink's runtime.
+For this Flink comes with a collection of so called test harnesses, which can be used to test such user-defined functions as well as custom operators:
+
+* `OneInputStreamOperatorTestHarness` (for operators on `DataStreams`s)
+* `KeyedOneInputStreamOperatorTestHarness` (for operators on `KeyedStream`s)
+* `TwoInputStreamOperatorTestHarness` (for operators of `ConnectedStreams` of two `DataStream`s)
+* `KeyedTwoInputStreamOperatorTestHarness` (for operators on `ConnectedStreams` of two `KeyedStream`s)
+
+To use the test harnesses a set of additional dependencies (test scoped) is needed.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-runtime{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+  <classifier>tests</classifier>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+  <classifier>tests</classifier>
+</dependency>
+{% endhighlight %}
+
+Now, the test harnesses can be used to push records and watermarks into your user-defined functions or custom operators, control processing time and finally assert on the output of the operator (including side outputs).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapTest {
+    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
+    private StatefulFlatMap statefulFlatMapFunction;
+
+    @Before
+    public void setupTestHarness() throws Exception {
+
+        //instantiate user-defined function
+        statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+        // wrap user defined function into a the corresponding operator
+        testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
+
+        // optionally configured the execution environment
+        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+        // open the test harness (will also call open() on RichFunctions)
+        testHarness.open();
+    }
+
+    @Test
+    public void testingStatefulFlatMapFunction() throws Exception {
+
+        //push (timestamped) elements into the operator (and hence user defined function)
+        testHarness.processElement(2L, 100L);
+
+        //trigger event time timers by advancing the event time of the operator with a watermark
+        testHarness.processWatermark(100L);
+
+        //trigger processing time timers by advancing the processing time of the operator directly
+        testHarness.setProcessingTime(100L);
+
+        //retrieve list of emitted records for assertions
+        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L))
+
+        //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
+        //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
+    }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+  private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
+  private var statefulFlatMap: StatefulFlatMapFunction = null
+
+  before {
+    //instantiate user-defined function
+    statefulFlatMap = new StatefulFlatMap
+
+    // wrap user defined function into a the corresponding operator
+    testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))
+
+    // optionally configured the execution environment
+    testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+    // open the test harness (will also call open() on RichFunctions)
+    testHarness.open();
+  }
+
+  "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
+
+
+    //push (timestamped) elements into the operator (and hence user defined function)
+    testHarness.processElement(2, 100);
+
+    //trigger event time timers by advancing the event time of the operator with a watermark
+    testHarness.processWatermark(100);
+
+    //trigger proccesign time timers by advancing the processing time of the operator directly
+    testHarness.setProcessingTime(100);
+
+    //retrieve list of emitted records for assertions
+    testHarness.getOutput should contain (3)
+
+    //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
+    //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedOneInputStreamOperatorTestHarness` and `KeyedTwoInputStreamOperatorTestHarness` are instantiated by additionally providing a `KeySelector` including `TypeInformation` for the class of the key.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapFunctionTest {
+    private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
+    private StatefulFlatMap statefulFlatMapFunction;
+
+    @Before
+    public void setupTestHarness() throws Exception {
+
+        //instantiate user-defined function
+        statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+        // wrap user defined function into a the corresponding operator
+        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
+
+        // open the test harness (will also call open() on RichFunctions)
+        testHarness.open();
     }
+
+    //tests
+
 }
+
 {% endhighlight %}
 </div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+  private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
+  private var statefulFlatMapFunction: FlattenFunction = null
+
+  before {
+    //instantiate user-defined function
+    statefulFlatMapFunction = new StateFulFlatMap
+
+    // wrap user defined function into a the corresponding operator
+    testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())
+
+    // open the test harness (will also call open() on RichFunctions)
+    testHarness.open();
+  }
+
+  //tests
+
+}
+{% endhighlight %}
 </div>
+</div>
+
+Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:
+
+* `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` is a good example for testing operators and user-defined functions, which depend on processing or event time.
+* `org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest` shows how to test a custom sink with the `AbstractStreamOperatorTestHarness`. Specifically, it uses `AbstractStreamOperatorTestHarness.snapshot` and `AbstractStreamOperatorTestHarness.initializeState` to tests its interaction with Flink's checkpointing mechanism.
 
-## Integration testing
+<span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.
 
 In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.