You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/14 10:49:22 UTC

[GitHub] [flink] kl0u commented on a change in pull request #10833: [FLINK-15535][documentation] Add usage of ProcessFunctionTestHarnesses for testing documentation

kl0u commented on a change in pull request #10833: [FLINK-15535][documentation] Add usage of ProcessFunctionTestHarnesses for testing documentation
URL: https://github.com/apache/flink/pull/10833#discussion_r366269830
 
 

 ##########
 File path: docs/dev/stream/testing.md
 ##########
 @@ -332,6 +332,87 @@ Many more examples for the usage of these test harnesses can be found in the Fli
 
 <span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.
 
+### Unit Testing ProcessFunction
+
+The `ProcessFunction` is a widely used low-level and powerful stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications.
+Flink provides a test harness named `ProcessFunctionTestHarnesses` that can be used to test your `ProcessFunction`. Considering this example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {
+
+	@Override
+	public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+        out.collect(value);
+	}
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class PassThroughProcessFunction extends ProcessFunction[Integer, Integer] {
+
+    @throws[Exception]
+    override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {
+      out.collect(value)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It is very easy to unit test such a function with `ProcessFunctionTestHarnesses` by passing suitable arguments and verifying the output.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class PassThroughProcessFunctionTest {
+
+    @Test
+    public void testPassThrough() throws Exception {
+
+        //instantiate user-defined function
+        PassThroughProcessFunction processFunction = new PassThroughProcessFunction();
+
+        // wrap user defined function into a the corresponding operator
+        OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
+        	.forProcessFunction(processFunction);
+
+        //push (timestamped) elements into the operator (and hence user defined function)
+        harness.processElement(1, 10);
+
+        //retrieve list of emitted records for assertions
+        assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class PassThroughProcessFunctionTest extends FlatSpec with Matchers {
+
+  "PassThroughProcessFunction" should "forward values" in {
+
+    //instantiate user-defined function
+    val processFunction = new PassThroughProcessFunction
+
+    // wrap user defined function into a the corresponding operator
+    val harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction)
+
+    //push (timestamped) elements into the operator (and hence user defined function)
+    harness.processElement(1, 10)
+
+    //retrieve list of emitted records for assertions
+    harness.extractOutputValues() should contain (1)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
 
 Review comment:
   Here I would add something like: "For more examples on how to use the `ProcessFunctionTestHarnesses` in order to test the different flavours of the `ProcessFunction`, e.g. `KeyedProcessFunction`, `KeyedCoProcessFunction`, `BroadcastProcessFunction`, etc, the user is encouraged to look at the `ProcessFunctionTestHarnessesTest`." 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services