You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:48 UTC

[09/11] flink git commit: [FLINK-4460] Add documentation for side outputs

[FLINK-4460] Add documentation for side outputs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0a58f78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0a58f78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0a58f78

Branch: refs/heads/master
Commit: f0a58f7882ba6ed5081769b5006b3c81b3508ae5
Parents: 639dee3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 7 12:06:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/side_output.md | 138 ++++++++++++++++++++++++++++++++++++
 docs/dev/windows.md            |  45 ++++++++++++
 2 files changed, 183 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0a58f78/docs/dev/stream/side_output.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/side_output.md b/docs/dev/stream/side_output.md
new file mode 100644
index 0000000..296a4d4
--- /dev/null
+++ b/docs/dev/stream/side_output.md
@@ -0,0 +1,138 @@
+---
+title: "Side Outputs"
+nav-title: "Side Outputs"
+nav-parent_id: streaming
+nav-pos: 36
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+In addition to the main stream that results from `DataStream` operations, you can also produce any
+number of additional side output result streams. The type of data in the result streams does not
+have to match the type of data in the main stream and the types of the different side outputs can
+also differ. This operation can be useful when you want to split a stream of data where you would
+normally have to replicate the stream and then filter out from each stream the data that you don't
+want to have.
+
+When using side outputs, you first need to define an `OutputTag` that will be used to identify a
+side output stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+// this needs to be an anonymous inner class, so that we can analyze the type
+OutputTag<String> outputTag = new OutputTag<String>("string-side-output") {};
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val outputTag = OutputTag[String]("string-side-output")
+{% endhighlight %}
+</div>
+</div>
+
+Notice how the `OutputTag` is typed according to the type of elements that the side output stream
+contains.
+
+Emitting data to a side output it only possible when using a
+[ProcessFunction](/dev/stream/process_function.html). In the function, you can use the `Context` parameter
+to emit data to a side output identified by an `OutputTag`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+DataStream<Integer> input = ...;
+
+final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
+
+SingleOutputStreamOperator<Integer> mainDataStream = input
+  .process(new ProcessFunction<Integer, Integer>() {
+
+      @Override
+      public void processElement(
+          Integer input,
+          Context ctx,
+          Collector<Integer> out) throws Exception {
+        // emit data to regular output
+        out.collect(value);
+
+        // emit data to side output
+        ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+      }
+    });
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val input: DataStream[Int] = ...
+val outputTag = OutputTag[String]("string-side-output")
+
+val mainDataStream = input
+  .process(new ProcessFunction[Int, Int] {
+    override def processElement(
+        value: Int,
+        ctx: ProcessFunction[Int, Int]#Context,
+        out: Collector[Int]): Unit = {
+      // emit data to regular output
+      out.collect(value)
+
+      // emit data to side output
+      ctx.output(outputTag, "sideout-" + String.valueOf(value))
+    }
+  })
+{% endhighlight %}
+</div>
+</div>
+
+For retrieving the side output stream you use `getSideOutput(OutputTag)`
+on the result of the `DataStream` operation. This will give you a `DataStream` that is typed
+to the result of the side output stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
+
+SingleOutputStreamOperator<Integer> mainDataStream = ...;
+
+DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val outputTag = OutputTag[String]("string-side-output")
+
+val mainDataStream = ...
+
+val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a58f78/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index 73f348e..f8643cd 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -973,6 +973,51 @@ input
 <span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
 data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
 
+### Getting late data as a side output
+
+Using Flink's [side output](/dev/stream/side_output.html) feature you can get a stream of the data
+that was discarded as late.
+
+You first need to specify that you want to get late data using `sideOutputLateData(OutputTag)` on
+the windowed stream. Then, you can get the side-output stream on the result of the windowed
+operation:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
+
+DataStream<T> input = ...;
+
+DataStream<T> result = input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .allowedLateness(<time>)
+    .sideOutputLateData(lateOutputTag)
+    .<windowed transformation>(<window function>);
+
+DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val lateOutputTag = OutputTag[T]("late-data")
+
+val input: DataStream[T] = ...
+
+val result = input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .allowedLateness(<time>)
+    .sideOutputLateData(lateOutputTag)
+    .<windowed transformation>(<window function>)
+
+val lateStream = result.getSideOutput(lateOutputTag)
+{% endhighlight %}
+</div>
+</div>
+
 ### Late elements considerations
 
 When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes