You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/05/17 14:55:28 UTC

[flink] branch master updated (2937b60 -> 5330196)

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2937b60  [hotfix][docs] Update POJO serialization documentation
     new 5f3a769  [FLINK-12508] [docs] extend section on unit testing Flink operators
     new 5330196  [FLINK-12508] [docs] add documentation about MiniClusterWithClientResource

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/testing.md | 416 +++++++++++++++++++++++++++++++++++----------
 1 file changed, 328 insertions(+), 88 deletions(-)


[flink] 01/02: [FLINK-12508] [docs] extend section on unit testing Flink operators

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f3a76944f35f012244dd45cc425c21d5af390f7
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Mon May 13 20:18:32 2019 +0200

    [FLINK-12508] [docs] extend section on unit testing Flink operators
---
 docs/dev/stream/testing.md | 276 +++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 256 insertions(+), 20 deletions(-)

diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
index 7676b19..4944cd9 100644
--- a/docs/dev/stream/testing.md
+++ b/docs/dev/stream/testing.md
@@ -23,25 +23,28 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a local environment.
+Testing is an integral part of every software development process as such Apache Flink comes with tooling to test your application code on multiple levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible.
+Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class SumReduce implements ReduceFunction<Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
 
     @Override
-    public Long reduce(Long value1, Long value2) throws Exception {
-        return value1 + value2;
+    public Long map(Long record) throws Exception {
+        return record +1 ;
     }
 }
 {% endhighlight %}
@@ -49,30 +52,30 @@ public class SumReduce implements ReduceFunction<Long> {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-    override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
-        value1 + value2
+    override def map(record: Long): Long = {
+        record + 1
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
     @Test
-    public void testSum() throws Exception {
+    public void testIncrement() throws Exception {
         // instantiate your function
-        SumReduce sumReduce = new SumReduce();
+        IncrementMapFunction incrementer = new IncrementMapFunction();
 
         // call the methods that you have implemented
-        assertEquals(42L, sumReduce.reduce(40L, 2L));
+        assertEquals(3L, incrementer.map(2L));
     }
 }
 {% endhighlight %}
@@ -80,21 +83,254 @@ public class SumReduceTest {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
 
-    "SumReduce" should "add values" in {
+    "IncrementMapFunction" should "increment values" in {
         // instantiate your function
-        val sumReduce: SumReduce = new SumReduce()
+        val incrementer: IncrementMapFunction = new IncrementMapFunction()
 
         // call the methods that you have implemented
-        sumReduce.reduce(40L, 2L) should be (42L)
+        incremeter.map(2) should be (3)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector.  A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
+
+    @Test
+    public void testIncrement() throws Exception {
+        // instantiate your function
+        IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+        Collector<Integer> collector = mock(Collector.class);
+
+        // call the methods that you have implemented
+        incrementer.flatMap(2L, collector)
+
+        //verify collector was called with the right output
+        Mockito.verify(collector, times(1)).collect(3L);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+    "IncrementFlatMapFunction" should "increment values" in {
+       // instantiate your function
+      val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()
+
+      val collector = mock[Collector[Integer]]
+
+      //verify collector was called with the right output
+      (collector.collect _).expects(3)
+
+      // call the methods that you have implemented
+      flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Unit Testing Stateful or Timely UDFs & Custom Operators
+
+Testing the functionality of a user-defined function, which makes use of managed state or timers is more difficult because it involves testing the interaction between the user code and Flink's runtime.
+For this Flink comes with a collection of so called test harnesses, which can be used to test such user-defined functions as well as custom operators:
+
+* `OneInputStreamOperatorTestHarness` (for operators on `DataStreams`s)
+* `KeyedOneInputStreamOperatorTestHarness` (for operators on `KeyedStream`s)
+* `TwoInputStreamOperatorTestHarness` (for operators of `ConnectedStreams` of two `DataStream`s)
+* `KeyedTwoInputStreamOperatorTestHarness` (for operators on `ConnectedStreams` of two `KeyedStream`s)
+
+To use the test harnesses a set of additional dependencies (test scoped) is needed.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-runtime{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+  <classifier>tests</classifier>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+  <classifier>tests</classifier>
+</dependency>
+{% endhighlight %}
+
+Now, the test harnesses can be used to push records and watermarks into your user-defined functions or custom operators, control processing time and finally assert on the output of the operator (including side outputs).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapTest {
+    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
+    private StatefulFlatMap statefulFlatMapFunction;
+
+    @Before
+    public void setupTestHarness() throws Exception {
+
+        //instantiate user-defined function
+        statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+        // wrap user defined function into a the corresponding operator
+        testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
+
+        // optionally configured the execution environment
+        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+        // open the test harness (will also call open() on RichFunctions)
+        testHarness.open();
+    }
+
+    @Test
+    public void testingStatefulFlatMapFunction() throws Exception {
+
+        //push (timestamped) elements into the operator (and hence user defined function)
+        testHarness.processElement(2L, 100L);
+
+        //trigger event time timers by advancing the event time of the operator with a watermark
+        testHarness.processWatermark(100L);
+
+        //trigger processing time timers by advancing the processing time of the operator directly
+        testHarness.setProcessingTime(100L);
+
+        //retrieve list of emitted records for assertions
+        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L))
+
+        //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
+        //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
+    }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+  private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
+  private var statefulFlatMap: StatefulFlatMapFunction = null
+
+  before {
+    //instantiate user-defined function
+    statefulFlatMap = new StatefulFlatMap
+
+    // wrap user defined function into a the corresponding operator
+    testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))
+
+    // optionally configured the execution environment
+    testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+    // open the test harness (will also call open() on RichFunctions)
+    testHarness.open();
+  }
+
+  "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
+
+
+    //push (timestamped) elements into the operator (and hence user defined function)
+    testHarness.processElement(2, 100);
+
+    //trigger event time timers by advancing the event time of the operator with a watermark
+    testHarness.processWatermark(100);
+
+    //trigger proccesign time timers by advancing the processing time of the operator directly
+    testHarness.setProcessingTime(100);
+
+    //retrieve list of emitted records for assertions
+    testHarness.getOutput should contain (3)
+
+    //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
+    //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedOneInputStreamOperatorTestHarness` and `KeyedTwoInputStreamOperatorTestHarness` are instantiated by additionally providing a `KeySelector` including `TypeInformation` for the class of the key.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapFunctionTest {
+    private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
+    private StatefulFlatMap statefulFlatMapFunction;
+
+    @Before
+    public void setupTestHarness() throws Exception {
+
+        //instantiate user-defined function
+        statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+        // wrap user defined function into a the corresponding operator
+        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
+
+        // open the test harness (will also call open() on RichFunctions)
+        testHarness.open();
     }
+
+    //tests
+
 }
+
 {% endhighlight %}
 </div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+  private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
+  private var statefulFlatMapFunction: FlattenFunction = null
+
+  before {
+    //instantiate user-defined function
+    statefulFlatMapFunction = new StateFulFlatMap
+
+    // wrap user defined function into a the corresponding operator
+    testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())
+
+    // open the test harness (will also call open() on RichFunctions)
+    testHarness.open();
+  }
+
+  //tests
+
+}
+{% endhighlight %}
 </div>
+</div>
+
+Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:
+
+* `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` is a good example for testing operators and user-defined functions, which depend on processing or event time.
+* `org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest` shows how to test a custom sink with the `AbstractStreamOperatorTestHarness`. Specifically, it uses `AbstractStreamOperatorTestHarness.snapshot` and `AbstractStreamOperatorTestHarness.initializeState` to tests its interaction with Flink's checkpointing mechanism.
 
-## Integration testing
+<span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.
 
 In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.
 


[flink] 02/02: [FLINK-12508] [docs] add documentation about MiniClusterWithClientResource

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5330196dc04e83771d7a81b45dd4392be1d72ba9
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Tue May 14 11:21:06 2019 +0200

    [FLINK-12508] [docs] add documentation about MiniClusterWithClientResource
---
 docs/dev/stream/testing.md | 140 +++++++++++++++++++++++----------------------
 1 file changed, 72 insertions(+), 68 deletions(-)

diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
index 4944cd9..8992d50 100644
--- a/docs/dev/stream/testing.md
+++ b/docs/dev/stream/testing.md
@@ -332,9 +332,14 @@ Many more examples for the usage of these test harnesses can be found in the Fli
 
 <span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.
 
-In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.
+## Testing Flink Jobs
 
-In order to do so add the test dependency `flink-test-utils`:
+### JUnit Rule `MiniClusterWithClientResource`
+
+Apache Flink provides a JUnit rule called `MiniClusterWithClientResource` for testing complete jobs against a local, embedded mini cluster.
+called `MiniClusterWithClientResource`.
+
+To use `MiniClusterWithClientResource` one additional dependency (test scoped) is needed.
 
 {% highlight xml %}
 <dependency>
@@ -344,16 +349,16 @@ In order to do so add the test dependency `flink-test-utils`:
 </dependency>
 {% endhighlight %}
 
-For example, if you want to test the following `MapFunction`:
+Let us take the same simple `MapFunction` as in the previous sections.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class MultiplyByTwo implements MapFunction<Long, Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
 
     @Override
-    public Long map(Long value) throws Exception {
-        return value * 2;
+    public Long map(Long record) throws Exception {
+        return record +1 ;
     }
 }
 {% endhighlight %}
@@ -361,36 +366,44 @@ public class MultiplyByTwo implements MapFunction<Long, Long> {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class MultiplyByTwo extends MapFunction[Long, Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-    override def map(value: Long): Long = {
-        value * 2
+    override def map(record: Long): Long = {
+        record + 1
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-You could write the following integration test:
+A simple pipeline using this `MapFunction` can now be tested in a local Flink cluster as follows.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class ExampleIntegrationTest extends AbstractTestBase {
+public class ExampleIntegrationTest {
+
+     @ClassRule
+     public static MiniClusterWithClientResource flinkCluster =
+         new MiniClusterWithClientResource(
+             new MiniClusterResourceConfiguration.Builder()
+                 .setNumberSlotsPerTaskManager(2)
+                 .setNumberTaskManagers(1)
+                 .build());
 
     @Test
-    public void testMultiply() throws Exception {
+    public void testIncrementPipeline() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // configure your test environment
-        env.setParallelism(1);
+        env.setParallelism(2);
 
         // values are collected in a static variable
         CollectSink.values.clear();
 
         // create a stream of custom elements and apply transformations
         env.fromElements(1L, 21L, 22L)
-                .map(new MultiplyByTwo())
+                .map(new IncrementMapFunction())
                 .addSink(new CollectSink());
 
         // execute
@@ -417,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {
 
-    @Test
-    def testMultiply(): Unit = {
-        val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
+    .setNumberSlotsPerTaskManager(1)
+    .setNumberTaskManagers(1)
+    .build)
 
-        // configure your test environment
-        env.setParallelism(1)
+  before {
+    flinkCluster.before()
+  }
 
-        // values are collected in a static variable
-        CollectSink.values.clear()
+  after {
+    flinkCluster.after()
+  }
 
-        // create a stream of custom elements and apply transformations
-        env
-            .fromElements(1L, 21L, 22L)
-            .map(new MultiplyByTwo())
-            .addSink(new CollectSink())
 
-        // execute
-        env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-        // verify your results
-        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-    }
-}    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // configure your test environment
+    env.setParallelism(2)
+
+    // values are collected in a static variable
+    CollectSink.values.clear()
+
+    // create a stream of custom elements and apply transformations
+    env.fromElements(1, 21, 22)
+       .map(new IncrementMapFunction())
+       .addSink(new CollectSink())
 
+    // execute
+    env.execute()
+
+    // verify your results
+    CollectSink.values should contain allOf (1,22,23)
+    }
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-    override def invoke(value: java.lang.Long): Unit = {
-        synchronized {
-            values.add(value)
-        }
+  override def invoke(value: Long): Unit = {
+    synchronized {
+      CollectSink.values.add(value)
     }
+  }
 }
 
 object CollectSink {
-
     // must be static
-    val values: List[Long] = new ArrayList()
+    val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 </div>
 </div>
 
-The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
-
-One way to test state handling is to enable checkpointing in integration tests. 
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
+* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-</div>
-</div>
+* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
+Alternatively, you could write the data to files in a temporary directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions.
+* You can implement a custom *parallel* source function for emitting watermarks if your job uses event timer timers.
 
-Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module.
+* It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel.
 
-For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module.
+* Prefer `@ClassRule` over `@Rule` so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests.
 
-Be aware that `AbstractStreamOperatorTestHarness` is currently not a part of public API and can be subject to change.
+* If your pipeline contains custom state handling, you can test its correctness by enabling checkpointing and restarting the job within the mini cluster. For this, you need to trigger a failure by throwing an exception from (a test-only) user-defined function in your pipeline.
 
 {% top %}