You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/05 16:07:23 UTC

[3/5] flink git commit: [FLINK-7568] Change role of ProcessWindowFunction and WindowFunction in doc

[FLINK-7568] Change role of ProcessWindowFunction and WindowFunction in doc


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fe8f217
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fe8f217
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fe8f217

Branch: refs/heads/master
Commit: 9fe8f217a69c67abd81f1424cc62cbb3d35c25c7
Parents: 7c11bd7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 1 12:20:18 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Sep 5 17:33:47 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 257 +++++++++++++++---------------
 1 file changed, 129 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fe8f217/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index c2d557f..4cfebf2 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -460,118 +460,15 @@ The above example appends all input `Long` values to an initially empty `String`
 
 <span class="label label-danger">Attention</span> `fold()` cannot be used with session windows or other mergeable windows.
 
-### WindowFunction - The Generic Case
-
-A `WindowFunction` gets an `Iterable` containing all the elements of the window and provides
-the most flexibility of all window functions. This comes
-at the cost of performance and resource consumption, because elements cannot be incrementally
-aggregated but instead need to be buffered internally until the window is considered ready for processing.
-
-The signature of a `WindowFunction` looks as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
-
-  /**
-   * Evaluates the window and outputs none or several elements.
-   *
-   * @param key The key for which this window is evaluated.
-   * @param window The window that is being evaluated.
-   * @param input The elements in the window being evaluated.
-   * @param out A collector for emitting elements.
-   *
-   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
-   */
-  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
-
-  /**
-    * Evaluates the window and outputs none or several elements.
-    *
-    * @param key    The key for which this window is evaluated.
-    * @param window The window that is being evaluated.
-    * @param input  The elements in the window being evaluated.
-    * @param out    A collector for emitting elements.
-    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
-    */
-  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
-}
-{% endhighlight %}
-</div>
-</div>
-
-A `WindowFunction` can be defined and used like this:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, Long>> input = ...;
-
-input
-    .keyBy(<key selector>)
-    .window(<window assigner>)
-    .apply(new MyWindowFunction());
-
-/* ... */
-
-public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
-
-  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
-    long count = 0;
-    for (Tuple<String, Long> in: input) {
-      count++;
-    }
-    out.collect("Window: " + window + "count: " + count);
-  }
-}
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[(String, Long)] = ...
-
-input
-    .keyBy(<key selector>)
-    .window(<window assigner>)
-    .apply(new MyWindowFunction())
-
-/* ... */
-
-class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
-
-  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
-    var count = 0L
-    for (in <- input) {
-      count = count + 1
-    }
-    out.collect(s"Window $window count: $count")
-  }
-}
-{% endhighlight %}
-</div>
-</div>
-
-The example shows a `WindowFunction` to count the elements in a window. In addition, the window function adds information about the window to the output.
-
-<span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`.
-
 ### ProcessWindowFunction
 
-In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This
-is very similar to `WindowFunction`, except that the interface allows to query more information
-about the context in which the window evaluation happens.
+A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context
+object with access to time and state information, which enables it to provide more flexibility than
+other window functions. This comes at the cost of performance and resource consumption, because
+elements cannot be incrementally aggregated but instead need to be buffered internally until the
+window is considered ready for processing.
 
-This is the `ProcessWindowFunction` interface:
+The signature of `ProcessWindowFunction` looks as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -620,7 +517,6 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
     * @param out      A collector for emitting elements.
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
-  @throws[Exception]
   def process(
       key: KEY,
       context: Context,
@@ -641,7 +537,7 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
 </div>
 </div>
 
-It can be used like this:
+A `ProcessWindowFunction` can be defined and used like this:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -652,6 +548,20 @@ input
     .keyBy(<key selector>)
     .window(<window assigner>)
     .process(new MyProcessWindowFunction());
+
+/* ... */
+
+public class MyProcessWindowFunction implements ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
+
+  void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) {
+    long count = 0;
+    for (Tuple<String, Long> in: input) {
+      count++;
+    }
+    out.collect("Window: " + context.window() + "count: " + count);
+  }
+}
+
 {% endhighlight %}
 </div>
 
@@ -663,25 +573,42 @@ input
     .keyBy(<key selector>)
     .window(<window assigner>)
     .process(new MyProcessWindowFunction())
+
+/* ... */
+
+class MyWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
+
+  def apply(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
+    var count = 0L
+    for (in <- input) {
+      count = count + 1
+    }
+    out.collect(s"Window ${context.window} count: $count")
+  }
+}
 {% endhighlight %}
 </div>
 </div>
 
-### WindowFunction with Incremental Aggregation
+The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output.
+
+<span class="label label-danger">Attention</span> Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`.
 
-A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
+### ProcessWindowFunction with Incremental Aggregation
+
+A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
 incrementally aggregate elements as they arrive in the window.
-When the window is closed, the `WindowFunction` will be provided with the aggregated result.
+When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result.
 This allows to incrementally compute windows while having access to the
-additional window meta information of the `WindowFunction`.
+additional window meta information of the `ProcessWindowFunction`.
 
-<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of
-`WindowFunction` for incremental window aggregation.
+<span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of
+`ProcessWindowFunction` for incremental window aggregation.
 
 #### Incremental Window Aggregation with FoldFunction
 
 The following example shows how an incremental `FoldFunction` can be combined with
-a `WindowFunction` to extract the number of events in the window and return also
+a `ProcessWindowFunction` to extract the number of events in the window and return also
 the key and end time of the window.
 
 <div class="codetabs" markdown="1">
@@ -692,7 +619,7 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
+  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
 
 // Function definitions
 
@@ -706,15 +633,15 @@ private static class MyFoldFunction
   }
 }
 
-private static class MyWindowFunction
-    implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
+private static class MyProcessWindowFunction
+    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
 
-  public void apply(String key,
-                    TimeWindow window,
+  public void process(String key,
+                    Context context,
                     Iterable<Tuple3<String, Long, Integer>> counts,
                     Collector<Tuple3<String, Long, Integer>> out) {
     Integer count = counts.iterator().next().getField(2);
-    out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
+    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
   }
 }
 
@@ -759,7 +686,7 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .reduce(new MyReduceFunction(), new MyWindowFunction());
+  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
 
 // Function definitions
 
@@ -770,11 +697,11 @@ private static class MyReduceFunction implements ReduceFunction<SensorReading> {
   }
 }
 
-private static class MyWindowFunction
-    implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
+private static class MyProcessWindowFunction
+    implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
 
   public void apply(String key,
-                    TimeWindow window,
+                    Context context,
                     Iterable<SensorReading> minReadings,
                     Collector<Tuple2<Long, SensorReading>> out) {
       SensorReading min = minReadings.iterator().next();
@@ -808,6 +735,80 @@ input
 </div>
 </div>
 
+### WindowFunction (Legacy)
+
+In some places where a `ProcessWindowFunction` can be used you can also use a `WindowFunction`. This
+is an older version of `ProcessWindowFunction` that provides less contextual information and does
+not have some advances features, such as per-window keyed state. This interface will be deprecated
+at some point.
+
+The signature of a `WindowFunction` looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
+
+  /**
+   * Evaluates the window and outputs none or several elements.
+   *
+   * @param key The key for which this window is evaluated.
+   * @param window The window that is being evaluated.
+   * @param input The elements in the window being evaluated.
+   * @param out A collector for emitting elements.
+   *
+   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+   */
+  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key    The key for which this window is evaluated.
+    * @param window The window that is being evaluated.
+    * @param input  The elements in the window being evaluated.
+    * @param out    A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
+}
+{% endhighlight %}
+</div>
+</div>
+
+It can be used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(new MyWindowFunction());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(new MyWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
 ## Triggers
 
 A `Trigger` determines when a window (as formed by the *window assigner*) is ready to be