You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/05/17 14:55:30 UTC

[flink] 02/02: [FLINK-12508] [docs] add documentation about MiniClusterWithClientResource

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5330196dc04e83771d7a81b45dd4392be1d72ba9
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Tue May 14 11:21:06 2019 +0200

    [FLINK-12508] [docs] add documentation about MiniClusterWithClientResource
---
 docs/dev/stream/testing.md | 140 +++++++++++++++++++++++----------------------
 1 file changed, 72 insertions(+), 68 deletions(-)

diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
index 4944cd9..8992d50 100644
--- a/docs/dev/stream/testing.md
+++ b/docs/dev/stream/testing.md
@@ -332,9 +332,14 @@ Many more examples for the usage of these test harnesses can be found in the Fli
 
 <span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.
 
-In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.
+## Testing Flink Jobs
 
-In order to do so add the test dependency `flink-test-utils`:
+### JUnit Rule `MiniClusterWithClientResource`
+
+Apache Flink provides a JUnit rule called `MiniClusterWithClientResource` for testing complete jobs against a local, embedded mini cluster.
+called `MiniClusterWithClientResource`.
+
+To use `MiniClusterWithClientResource` one additional dependency (test scoped) is needed.
 
 {% highlight xml %}
 <dependency>
@@ -344,16 +349,16 @@ In order to do so add the test dependency `flink-test-utils`:
 </dependency>
 {% endhighlight %}
 
-For example, if you want to test the following `MapFunction`:
+Let us take the same simple `MapFunction` as in the previous sections.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class MultiplyByTwo implements MapFunction<Long, Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
 
     @Override
-    public Long map(Long value) throws Exception {
-        return value * 2;
+    public Long map(Long record) throws Exception {
+        return record +1 ;
     }
 }
 {% endhighlight %}
@@ -361,36 +366,44 @@ public class MultiplyByTwo implements MapFunction<Long, Long> {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class MultiplyByTwo extends MapFunction[Long, Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-    override def map(value: Long): Long = {
-        value * 2
+    override def map(record: Long): Long = {
+        record + 1
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-You could write the following integration test:
+A simple pipeline using this `MapFunction` can now be tested in a local Flink cluster as follows.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class ExampleIntegrationTest extends AbstractTestBase {
+public class ExampleIntegrationTest {
+
+     @ClassRule
+     public static MiniClusterWithClientResource flinkCluster =
+         new MiniClusterWithClientResource(
+             new MiniClusterResourceConfiguration.Builder()
+                 .setNumberSlotsPerTaskManager(2)
+                 .setNumberTaskManagers(1)
+                 .build());
 
     @Test
-    public void testMultiply() throws Exception {
+    public void testIncrementPipeline() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // configure your test environment
-        env.setParallelism(1);
+        env.setParallelism(2);
 
         // values are collected in a static variable
         CollectSink.values.clear();
 
         // create a stream of custom elements and apply transformations
         env.fromElements(1L, 21L, 22L)
-                .map(new MultiplyByTwo())
+                .map(new IncrementMapFunction())
                 .addSink(new CollectSink());
 
         // execute
@@ -417,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {
 
-    @Test
-    def testMultiply(): Unit = {
-        val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
+    .setNumberSlotsPerTaskManager(1)
+    .setNumberTaskManagers(1)
+    .build)
 
-        // configure your test environment
-        env.setParallelism(1)
+  before {
+    flinkCluster.before()
+  }
 
-        // values are collected in a static variable
-        CollectSink.values.clear()
+  after {
+    flinkCluster.after()
+  }
 
-        // create a stream of custom elements and apply transformations
-        env
-            .fromElements(1L, 21L, 22L)
-            .map(new MultiplyByTwo())
-            .addSink(new CollectSink())
 
-        // execute
-        env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-        // verify your results
-        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-    }
-}    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // configure your test environment
+    env.setParallelism(2)
+
+    // values are collected in a static variable
+    CollectSink.values.clear()
+
+    // create a stream of custom elements and apply transformations
+    env.fromElements(1, 21, 22)
+       .map(new IncrementMapFunction())
+       .addSink(new CollectSink())
 
+    // execute
+    env.execute()
+
+    // verify your results
+    CollectSink.values should contain allOf (1,22,23)
+    }
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-    override def invoke(value: java.lang.Long): Unit = {
-        synchronized {
-            values.add(value)
-        }
+  override def invoke(value: Long): Unit = {
+    synchronized {
+      CollectSink.values.add(value)
     }
+  }
 }
 
 object CollectSink {
-
     // must be static
-    val values: List[Long] = new ArrayList()
+    val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 </div>
 </div>
 
-The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
-
-One way to test state handling is to enable checkpointing in integration tests. 
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
+* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-</div>
-</div>
+* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
+Alternatively, you could write the data to files in a temporary directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions.
+* You can implement a custom *parallel* source function for emitting watermarks if your job uses event timer timers.
 
-Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module.
+* It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel.
 
-For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module.
+* Prefer `@ClassRule` over `@Rule` so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests.
 
-Be aware that `AbstractStreamOperatorTestHarness` is currently not a part of public API and can be subject to change.
+* If your pipeline contains custom state handling, you can test its correctness by enabling checkpointing and restarting the job within the mini cluster. For this, you need to trigger a failure by throwing an exception from (a test-only) user-defined function in your pipeline.
 
 {% top %}