You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/05/17 14:55:29 UTC
[flink] 01/02: [FLINK-12508] [docs] extend section on unit testing
Flink operators
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5f3a76944f35f012244dd45cc425c21d5af390f7
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Mon May 13 20:18:32 2019 +0200
[FLINK-12508] [docs] extend section on unit testing Flink operators
---
docs/dev/stream/testing.md | 276 +++++++++++++++++++++++++++++++++++++++++----
1 file changed, 256 insertions(+), 20 deletions(-)
diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
index 7676b19..4944cd9 100644
--- a/docs/dev/stream/testing.md
+++ b/docs/dev/stream/testing.md
@@ -23,25 +23,28 @@ specific language governing permissions and limitations
under the License.
-->
-This page briefly discusses how to test a Flink application in your IDE or a local environment.
+Testing is an integral part of every software development process as such Apache Flink comes with tooling to test your application code on multiple levels of the testing pyramid.
* This will be replaced by the TOC
{:toc}
-## Unit testing
+## Testing User-Defined Functions
-Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible.
+Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible.
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class SumReduce implements ReduceFunction<Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
+ public Long map(Long record) throws Exception {
+ return record +1 ;
}
}
{% endhighlight %}
@@ -49,30 +52,30 @@ public class SumReduce implements ReduceFunction<Long> {
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
- override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
- value1 + value2
+ override def map(record: Long): Long = {
+ record + 1
}
}
{% endhighlight %}
</div>
</div>
-It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
@Test
- public void testSum() throws Exception {
+ public void testIncrement() throws Exception {
// instantiate your function
- SumReduce sumReduce = new SumReduce();
+ IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
- assertEquals(42L, sumReduce.reduce(40L, 2L));
+ assertEquals(3L, incrementer.map(2L));
}
}
{% endhighlight %}
@@ -80,21 +83,254 @@ public class SumReduceTest {
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
- "SumReduce" should "add values" in {
+ "IncrementMapFunction" should "increment values" in {
// instantiate your function
- val sumReduce: SumReduce = new SumReduce()
+ val incrementer: IncrementMapFunction = new IncrementMapFunction()
// call the methods that you have implemented
- sumReduce.reduce(40L, 2L) should be (42L)
+ incremeter.map(2) should be (3)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector. A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
+
+ @Test
+ public void testIncrement() throws Exception {
+ // instantiate your function
+ IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+ Collector<Integer> collector = mock(Collector.class);
+
+ // call the methods that you have implemented
+ incrementer.flatMap(2L, collector)
+
+ //verify collector was called with the right output
+ Mockito.verify(collector, times(1)).collect(3L);
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+ "IncrementFlatMapFunction" should "increment values" in {
+ // instantiate your function
+ val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()
+
+ val collector = mock[Collector[Integer]]
+
+ //verify collector was called with the right output
+ (collector.collect _).expects(3)
+
+ // call the methods that you have implemented
+ flattenFunction.flatMap(2, collector)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Unit Testing Stateful or Timely UDFs & Custom Operators
+
+Testing the functionality of a user-defined function, which makes use of managed state or timers is more difficult because it involves testing the interaction between the user code and Flink's runtime.
+For this Flink comes with a collection of so called test harnesses, which can be used to test such user-defined functions as well as custom operators:
+
+* `OneInputStreamOperatorTestHarness` (for operators on `DataStreams`s)
+* `KeyedOneInputStreamOperatorTestHarness` (for operators on `KeyedStream`s)
+* `TwoInputStreamOperatorTestHarness` (for operators of `ConnectedStreams` of two `DataStream`s)
+* `KeyedTwoInputStreamOperatorTestHarness` (for operators on `ConnectedStreams` of two `KeyedStream`s)
+
+To use the test harnesses a set of additional dependencies (test scoped) is needed.
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+ <scope>test</scope>
+</dependency>
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+</dependency>
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+</dependency>
+{% endhighlight %}
+
+Now, the test harnesses can be used to push records and watermarks into your user-defined functions or custom operators, control processing time and finally assert on the output of the operator (including side outputs).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapTest {
+ private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
+ private StatefulFlatMap statefulFlatMapFunction;
+
+ @Before
+ public void setupTestHarness() throws Exception {
+
+ //instantiate user-defined function
+ statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
+
+ // optionally configured the execution environment
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ @Test
+ public void testingStatefulFlatMapFunction() throws Exception {
+
+ //push (timestamped) elements into the operator (and hence user defined function)
+ testHarness.processElement(2L, 100L);
+
+ //trigger event time timers by advancing the event time of the operator with a watermark
+ testHarness.processWatermark(100L);
+
+ //trigger processing time timers by advancing the processing time of the operator directly
+ testHarness.setProcessingTime(100L);
+
+ //retrieve list of emitted records for assertions
+ assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L))
+
+ //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
+ //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+ private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
+ private var statefulFlatMap: StatefulFlatMapFunction = null
+
+ before {
+ //instantiate user-defined function
+ statefulFlatMap = new StatefulFlatMap
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))
+
+ // optionally configured the execution environment
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
+
+
+ //push (timestamped) elements into the operator (and hence user defined function)
+ testHarness.processElement(2, 100);
+
+ //trigger event time timers by advancing the event time of the operator with a watermark
+ testHarness.processWatermark(100);
+
+ //trigger proccesign time timers by advancing the processing time of the operator directly
+ testHarness.setProcessingTime(100);
+
+ //retrieve list of emitted records for assertions
+ testHarness.getOutput should contain (3)
+
+ //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
+ //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedOneInputStreamOperatorTestHarness` and `KeyedTwoInputStreamOperatorTestHarness` are instantiated by additionally providing a `KeySelector` including `TypeInformation` for the class of the key.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapFunctionTest {
+ private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
+ private StatefulFlatMap statefulFlatMapFunction;
+
+ @Before
+ public void setupTestHarness() throws Exception {
+
+ //instantiate user-defined function
+ statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
}
+
+ //tests
+
}
+
{% endhighlight %}
</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+ private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
+ private var statefulFlatMapFunction: FlattenFunction = null
+
+ before {
+ //instantiate user-defined function
+ statefulFlatMapFunction = new StateFulFlatMap
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ //tests
+
+}
+{% endhighlight %}
</div>
+</div>
+
+Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:
+
+* `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` is a good example for testing operators and user-defined functions, which depend on processing or event time.
+* `org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest` shows how to test a custom sink with the `AbstractStreamOperatorTestHarness`. Specifically, it uses `AbstractStreamOperatorTestHarness.snapshot` and `AbstractStreamOperatorTestHarness.initializeState` to tests its interaction with Flink's checkpointing mechanism.
-## Integration testing
+<span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.
In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.