You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:13 UTC

[3/8] flink git commit: [FLINK-4997] Add doc for ProcessWindowFunction

[FLINK-4997] Add doc for ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f047e13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f047e13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f047e13

Branch: refs/heads/master
Commit: 4f047e13518ad2eb493903179e38eb174a37994c
Parents: 86dff0e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 9 11:56:46 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 docs/dev/windows.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f047e13/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index f8be08f..73f348e 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -565,6 +565,108 @@ The example shows a `WindowFunction` to count the elements in a window. In addit
 
 <span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`.
 
+### ProcessWindowFunction
+
+In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This
+is very similar to `WindowFunction`, except that the interface allows to query more information
+about the context in which the window evaluation happens.
+
+This is the `ProcessWindowFunction` interface:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+    /**
+     * Evaluates the window and outputs none or several elements.
+     *
+     * @param key The key for which this window is evaluated.
+     * @param context The context in which the window is being evaluated.
+     * @param elements The elements in the window being evaluated.
+     * @param out A collector for emitting elements.
+     *
+     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+     */
+    public abstract void process(
+            KEY key,
+            Context context,
+            Iterable<IN> elements,
+            Collector<OUT> out) throws Exception;
+
+    /**
+     * The context holding window metadata
+     */
+    public abstract class Context {
+        /**
+         * @return The window that is being evaluated.
+         */
+        public abstract W window();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key      The key for which this window is evaluated.
+    * @param context  The context in which the window is being evaluated.
+    * @param elements The elements in the window being evaluated.
+    * @param out      A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def process(
+      key: KEY,
+      context: Context,
+      elements: Iterable[IN],
+      out: Collector[OUT])
+
+  /**
+    * The context holding window metadata
+    */
+  abstract class Context {
+    /**
+      * @return The window that is being evaluated.
+      */
+    def window: W
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It can be used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .process(new MyProcessWindowFunction());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .process(new MyProcessWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
 ### WindowFunction with Incremental Aggregation
 
 A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
@@ -573,6 +675,9 @@ When the window is closed, the `WindowFunction` will be provided with the aggreg
 This allows to incrementally compute windows while having access to the
 additional window meta information of the `WindowFunction`.
 
+<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of
+`WindowFunction` for incremental window aggregation.
+
 #### Incremental Window Aggregation with FoldFunction
 
 The following example shows how an incremental `FoldFunction` can be combined with