You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/08/29 15:23:49 UTC

[1/2] flink git commit: [hotfix] [docs] Fix typos and improve testing docs

Repository: flink
Updated Branches:
  refs/heads/master 1e2a63874 -> 5ce0ded1b


[hotfix] [docs] Fix typos and improve testing docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ce0ded1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ce0ded1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ce0ded1

Branch: refs/heads/master
Commit: 5ce0ded1b71faf7e5d90865ba9481e254c3fb5be
Parents: 00fc641
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 29 17:19:50 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 29 17:21:31 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/testing.md | 263 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 263 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ce0ded1/docs/dev/stream/testing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
new file mode 100644
index 0000000..44f5cfd
--- /dev/null
+++ b/docs/dev/stream/testing.md
@@ -0,0 +1,263 @@
+---
+title: "Testing"
+nav-parent_id: streaming
+nav-id: testing
+nav-pos: 99
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This page briefly discusses how to test a Flink application in your IDE or a local environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible.
+
+For example if one implements the following `ReduceFunction`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class SumReduce implements ReduceFunction<Long> {
+
+    @Override
+    public Long reduce(Long value1, Long value2) throws Exception {
+        return value1 + value2;
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class SumReduce extends ReduceFunction[Long] {
+
+    override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
+        value1 + value2
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class SumReduceTest {
+
+    @Test
+    public void testSum() throws Exception {
+        // intiantiate your function
+        SumReduce sumReduce = new SumReduce();
+
+        // call the methods that you have implemented
+        assertEquals(42L, sumReduce.reduce(40L, 2L));
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class SumReduceTest extends FlatSpec with Matchers {
+
+    "SumReduce" should "add values" in {
+        // intiantiate your function
+        val sumReduce: SumReduce = new SumReduce()
+
+        // call the methods that you have implemented
+        sumReduce.reduce(40L, 2L) should be (42L)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Integration testing
+
+In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.
+
+In order to do so add the test dependency `flink-test-utils`:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+For example, if you want to test the following `MapFunction`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class MultiplyByTwo implements MapFunction<Long, Long> {
+
+    @Override
+    public Long map(Long value) throws Exception {
+        return value * 2;
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class MultiplyByTwo extends MapFunction[Long, Long] {
+
+    override def map(value: Long): Long = {
+        value * 2
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+You could write the following integration test:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+    @Test
+    public void testMultiply() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // configure your test environment
+        env.setParallelism(1);
+
+        // values are collected in a static variable
+        CollectSink.values.clear();
+
+        // create a stream of custom elements and apply transformations
+        env.fromElements(1L, 21L, 22L)
+                .map(new MultiplyByTwo())
+                .addSink(new CollectSink());
+
+        // execute
+        env.execute();
+
+        // verify your results
+        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
+    }
+
+    // create a testing sink
+    private static class CollectSink implements SinkFunction<Long> {
+
+        // must be static
+        public static final List<Long> values = new ArrayList<>();
+
+        @Override
+        public synchronized void invoke(Long value) throws Exception {
+            values.add(value);
+        }
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+    @Test
+    def testMultiply(): Unit = {
+        val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+        // configure your test environment
+        env.setParallelism(1)
+
+        // values are collected in a static variable
+        CollectSink.values.clear()
+
+        // create a stream of custom elements and apply transformations
+        env
+            .fromElements(1L, 21L, 22L)
+            .map(new MultiplyByTwo())
+            .addSink(new CollectSink())
+
+        // execute
+        env.execute()
+
+        // verify your results
+        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
+    }
+}    
+
+// create a testing sink
+class CollectSink extends SinkFunction[Long] {
+
+    override def invoke(value: java.lang.Long): Unit = {
+        synchronized {
+            values.add(value)
+        }
+    }
+}
+
+object CollectSink {
+
+    // must be static
+    val values: List[Long] = new ArrayList()
+}
+{% endhighlight %}
+</div>
+</div>
+
+The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary directory with your test sink.
+You can also implement your own custom sources for emitting watermarks.
+
+## Testing checkpointing and state handling
+
+One way to test state handling is to enable checkpointing in integration tests. 
+
+You can do that by configuring your `StreamExecutionEnvironment` in the test:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+{% endhighlight %}
+</div>
+</div>
+
+And for example adding to your Flink application an identity mapper operator that will throw an exception
+once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions.
+
+Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module.
+
+For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module.
+
+Be aware that `AbstractStreamOperatorTestHarness` is currently not a part of public API and can be subject to change.


[2/2] flink git commit: [hotfix] [docs] Add section in docs about writing unit and integration tests

Posted by tw...@apache.org.
[hotfix] [docs] Add section in docs about writing unit and integration tests

This closes #4454.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00fc6413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00fc6413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00fc6413

Branch: refs/heads/master
Commit: 00fc64134ae3b9ad8644b2c232853aeb02917ddf
Parents: 1e2a638
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Aug 1 15:02:56 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 29 17:21:31 2017 +0200

----------------------------------------------------------------------
 docs/dev/testing.md | 189 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00fc6413/docs/dev/testing.md
----------------------------------------------------------------------
diff --git a/docs/dev/testing.md b/docs/dev/testing.md
new file mode 100644
index 0000000..df65306
--- /dev/null
+++ b/docs/dev/testing.md
@@ -0,0 +1,189 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-id: testing
+nav-pos: 99
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This page briefly discusses how to test Flink application in the local environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. For example if one implement following `ReduceFunction`:
+
+~~~java
+public class SumReduce implements ReduceFunction<Long> {
+    @Override
+    public Long reduce(Long value1, Long value2) throws Exception {
+        return value1 + value2;
+    }
+}
+~~~
+
+it is very easy to unit test it with your favorite framework:
+
+~~~java
+public class SumReduceTest {
+    @Test
+    public void testSum() throws Exception {
+        SumReduce sumReduce = new SumReduce();
+
+        assertEquals(42L, sumReduce.reduce(40L, 2L));
+    }
+}
+~~~
+
+Or in scala:
+
+~~~scala
+class SumReduce extends ReduceFunction[Long] {
+    override def reduce(value1: java.lang.Long,
+                        value2: java.lang.Long): java.lang.Long = value1 + value2
+}
+~~~
+
+~~~scala
+class SumReduceTest extends FlatSpec with Matchers {
+    "SumReduce" should "add values" in {
+        val sumReduce: SumReduce = new SumReduce()
+        sumReduce.reduce(40L, 2L) should be (42L)
+    }
+}
+~~~
+
+## Integration testing
+
+You also can write integration tests that are executed against local Flink mini cluster.
+In order to do so add a test dependency `flink-test-utils`.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+For example if you want to test the following `MapFunction`:
+
+~~~java
+public class MultiplyByTwo implements MapFunction<Long, Long> {
+    @Override
+    public Long map(Long value) throws Exception {
+        return value * 2;
+    }
+}
+~~~
+
+You could write following integration test:
+
+~~~java
+public class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+    @Test
+    public void testMultiply() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        // values are collected on a static variable
+        CollectSink.values.clear();
+
+        env.fromElements(1L, 21L, 22L)
+                .map(new MultiplyByTwo())
+                .addSink(new CollectSink());
+        env.execute();
+
+        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
+    }
+
+    private static class CollectSink implements SinkFunction<Long> {
+        // must be static
+        public static final List<Long> values = new ArrayList<>();
+
+        @Override
+        public synchronized void invoke(Long value) throws Exception {
+            values.add(value);
+        }
+    }
+}
+~~~
+
+or in Scala:
+
+~~~scala
+class MultiplyByTwo extends MapFunction[Long, Long] {
+  override def map(value: java.lang.Long): java.lang.Long = value * 2
+}
+~~~
+
+~~~scala
+class ExampleIntegrationTest extends FlatSpec with Matchers {
+    "MultiplyByTwo" should "multiply it input by two" in {
+        val env: StreamExecutionEnvironment =
+            StreamExecutionEnvironment.getExecutionEnvironment
+        env.setParallelism(1)
+        // values are collected on a static variable
+        CollectSink.values.clear()
+        env
+            .fromElements(1L, 21L, 22L)
+            .map(new MultiplyByTwo())
+            .addSink(new CollectSink())
+            env.execute()
+        CollectSink.values should be (Lists.newArrayList(2L, 42L, 44L))
+    }
+}
+
+object CollectSink {
+    // must be static
+    val values: List[Long] = new ArrayList()
+}
+
+class CollectSink extends SinkFunction[Long] {
+    override def invoke(value: java.lang.Long): Unit = {
+        synchronized {
+            values.add(value)
+        }
+    }
+}
+~~~
+
+Static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local flink mini cluster via static variables is one way around this issue.
+Alternatively in your test sink you could for example write the data to files in a temporary directory.
+Of course you could use your own custom sources and sinks, which can emit watermarks.
+
+## Testing checkpointing and state handling
+
+One way to test state handling is to enable checkpointing in integration tests. You can do that by
+configuring `environment` in the test:
+~~~java
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+~~~
+and for example adding to your Flink application an identity mapper operator that will throw an exception
+once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions.
+
+Another approach is to write a unit test using `AbstractStreamOperatorTestHarness` from `flink-streaming-java` module.
+For example how to do that please look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest`
+also in the `flink-streaming-java`. Be aware that `AbstractStreamOperatorTestHarness` is not currently a part of public API
+and can be subject to change.