You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:13 UTC
[3/8] flink git commit: [FLINK-4997] Add doc for ProcessWindowFunction
[FLINK-4997] Add doc for ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f047e13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f047e13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f047e13
Branch: refs/heads/master
Commit: 4f047e13518ad2eb493903179e38eb174a37994c
Parents: 86dff0e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 9 11:56:46 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
docs/dev/windows.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4f047e13/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index f8be08f..73f348e 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -565,6 +565,108 @@ The example shows a `WindowFunction` to count the elements in a window. In addit
<span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`.
+### ProcessWindowFunction
+
+In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This
+is very similar to `WindowFunction`, except that the interface allows to query more information
+about the context in which the window evaluation happens.
+
+This is the `ProcessWindowFunction` interface:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public abstract void process(
+ KEY key,
+ Context context,
+ Iterable<IN> elements,
+ Collector<OUT> out) throws Exception;
+
+ /**
+ * The context holding window metadata
+ */
+ public abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ public abstract W window();
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def process(
+ key: KEY,
+ context: Context,
+ elements: Iterable[IN],
+ out: Collector[OUT])
+
+ /**
+ * The context holding window metadata
+ */
+ abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ def window: W
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It can be used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .process(new MyProcessWindowFunction());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .process(new MyProcessWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
### WindowFunction with Incremental Aggregation
A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
@@ -573,6 +675,9 @@ When the window is closed, the `WindowFunction` will be provided with the aggreg
This allows to incrementally compute windows while having access to the
additional window meta information of the `WindowFunction`.
+<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of
+`WindowFunction` for incremental window aggregation.
+
#### Incremental Window Aggregation with FoldFunction
The following example shows how an incremental `FoldFunction` can be combined with