You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:48 UTC
[09/11] flink git commit: [FLINK-4460] Add documentation for side
outputs
[FLINK-4460] Add documentation for side outputs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0a58f78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0a58f78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0a58f78
Branch: refs/heads/master
Commit: f0a58f7882ba6ed5081769b5006b3c81b3508ae5
Parents: 639dee3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 7 12:06:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/side_output.md | 138 ++++++++++++++++++++++++++++++++++++
docs/dev/windows.md | 45 ++++++++++++
2 files changed, 183 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0a58f78/docs/dev/stream/side_output.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/side_output.md b/docs/dev/stream/side_output.md
new file mode 100644
index 0000000..296a4d4
--- /dev/null
+++ b/docs/dev/stream/side_output.md
@@ -0,0 +1,138 @@
+---
+title: "Side Outputs"
+nav-title: "Side Outputs"
+nav-parent_id: streaming
+nav-pos: 36
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+In addition to the main stream that results from `DataStream` operations, you can also produce any
+number of additional side output result streams. The type of data in the result streams does not
+have to match the type of data in the main stream and the types of the different side outputs can
+also differ. This operation can be useful when you want to split a stream of data where you would
+normally have to replicate the stream and then filter out from each stream the data that you don't
+want to have.
+
+When using side outputs, you first need to define an `OutputTag` that will be used to identify a
+side output stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+// this needs to be an anonymous inner class, so that we can analyze the type
+OutputTag<String> outputTag = new OutputTag<String>("string-side-output") {};
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val outputTag = OutputTag[String]("string-side-output")
+{% endhighlight %}
+</div>
+</div>
+
+Notice how the `OutputTag` is typed according to the type of elements that the side output stream
+contains.
+
+Emitting data to a side output it only possible when using a
+[ProcessFunction](/dev/stream/process_function.html). In the function, you can use the `Context` parameter
+to emit data to a side output identified by an `OutputTag`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+DataStream<Integer> input = ...;
+
+final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
+
+SingleOutputStreamOperator<Integer> mainDataStream = input
+ .process(new ProcessFunction<Integer, Integer>() {
+
+ @Override
+ public void processElement(
+ Integer input,
+ Context ctx,
+ Collector<Integer> out) throws Exception {
+ // emit data to regular output
+ out.collect(value);
+
+ // emit data to side output
+ ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+ }
+ });
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val input: DataStream[Int] = ...
+val outputTag = OutputTag[String]("string-side-output")
+
+val mainDataStream = input
+ .process(new ProcessFunction[Int, Int] {
+ override def processElement(
+ value: Int,
+ ctx: ProcessFunction[Int, Int]#Context,
+ out: Collector[Int]): Unit = {
+ // emit data to regular output
+ out.collect(value)
+
+ // emit data to side output
+ ctx.output(outputTag, "sideout-" + String.valueOf(value))
+ }
+ })
+{% endhighlight %}
+</div>
+</div>
+
+For retrieving the side output stream you use `getSideOutput(OutputTag)`
+on the result of the `DataStream` operation. This will give you a `DataStream` that is typed
+to the result of the side output stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
+
+SingleOutputStreamOperator<Integer> mainDataStream = ...;
+
+DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val outputTag = OutputTag[String]("string-side-output")
+
+val mainDataStream = ...
+
+val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
+{% endhighlight %}
+</div>
+</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/f0a58f78/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index 73f348e..f8643cd 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -973,6 +973,51 @@ input
<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
+### Getting late data as a side output
+
+Using Flink's [side output](/dev/stream/side_output.html) feature you can get a stream of the data
+that was discarded as late.
+
+You first need to specify that you want to get late data using `sideOutputLateData(OutputTag)` on
+the windowed stream. Then, you can get the side-output stream on the result of the windowed
+operation:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
+
+DataStream<T> input = ...;
+
+DataStream<T> result = input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .allowedLateness(<time>)
+ .sideOutputLateData(lateOutputTag)
+ .<windowed transformation>(<window function>);
+
+DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val lateOutputTag = OutputTag[T]("late-data")
+
+val input: DataStream[T] = ...
+
+val result = input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .allowedLateness(<time>)
+ .sideOutputLateData(lateOutputTag)
+ .<windowed transformation>(<window function>)
+
+val lateStream = result.getSideOutput(lateOutputTag)
+{% endhighlight %}
+</div>
+</div>
+
### Late elements considerations
When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes