You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/05 16:07:23 UTC
[3/5] flink git commit: [FLINK-7568] Change role of
ProcessWindowFunction and WindowFunction in doc
[FLINK-7568] Change role of ProcessWindowFunction and WindowFunction in doc
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fe8f217
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fe8f217
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fe8f217
Branch: refs/heads/master
Commit: 9fe8f217a69c67abd81f1424cc62cbb3d35c25c7
Parents: 7c11bd7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 1 12:20:18 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Sep 5 17:33:47 2017 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/windows.md | 257 +++++++++++++++---------------
1 file changed, 129 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe8f217/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index c2d557f..4cfebf2 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -460,118 +460,15 @@ The above example appends all input `Long` values to an initially empty `String`
<span class="label label-danger">Attention</span> `fold()` cannot be used with session windows or other mergeable windows.
-### WindowFunction - The Generic Case
-
-A `WindowFunction` gets an `Iterable` containing all the elements of the window and provides
-the most flexibility of all window functions. This comes
-at the cost of performance and resource consumption, because elements cannot be incrementally
-aggregated but instead need to be buffered internally until the window is considered ready for processing.
-
-The signature of a `WindowFunction` looks as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
-
- /**
- * Evaluates the window and outputs none or several elements.
- *
- * @param key The key for which this window is evaluated.
- * @param window The window that is being evaluated.
- * @param input The elements in the window being evaluated.
- * @param out A collector for emitting elements.
- *
- * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
- */
- void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
-
- /**
- * Evaluates the window and outputs none or several elements.
- *
- * @param key The key for which this window is evaluated.
- * @param window The window that is being evaluated.
- * @param input The elements in the window being evaluated.
- * @param out A collector for emitting elements.
- * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
- */
- def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
-}
-{% endhighlight %}
-</div>
-</div>
-
-A `WindowFunction` can be defined and used like this:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, Long>> input = ...;
-
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .apply(new MyWindowFunction());
-
-/* ... */
-
-public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
-
- void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
- long count = 0;
- for (Tuple<String, Long> in: input) {
- count++;
- }
- out.collect("Window: " + window + "count: " + count);
- }
-}
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[(String, Long)] = ...
-
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .apply(new MyWindowFunction())
-
-/* ... */
-
-class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
-
- def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
- var count = 0L
- for (in <- input) {
- count = count + 1
- }
- out.collect(s"Window $window count: $count")
- }
-}
-{% endhighlight %}
-</div>
-</div>
-
-The example shows a `WindowFunction` to count the elements in a window. In addition, the window function adds information about the window to the output.
-
-<span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`.
-
### ProcessWindowFunction
-In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This
-is very similar to `WindowFunction`, except that the interface allows to query more information
-about the context in which the window evaluation happens.
+A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context
+object with access to time and state information, which enables it to provide more flexibility than
+other window functions. This comes at the cost of performance and resource consumption, because
+elements cannot be incrementally aggregated but instead need to be buffered internally until the
+window is considered ready for processing.
-This is the `ProcessWindowFunction` interface:
+The signature of `ProcessWindowFunction` looks as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -620,7 +517,6 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
- @throws[Exception]
def process(
key: KEY,
context: Context,
@@ -641,7 +537,7 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
</div>
</div>
-It can be used like this:
+A `ProcessWindowFunction` can be defined and used like this:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -652,6 +548,20 @@ input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction());
+
+/* ... */
+
+public class MyProcessWindowFunction implements ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
+
+ void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) {
+ long count = 0;
+ for (Tuple<String, Long> in: input) {
+ count++;
+ }
+ out.collect("Window: " + context.window() + "count: " + count);
+ }
+}
+
{% endhighlight %}
</div>
@@ -663,25 +573,42 @@ input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction())
+
+/* ... */
+
+class MyWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
+
+ def apply(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
+ var count = 0L
+ for (in <- input) {
+ count = count + 1
+ }
+ out.collect(s"Window ${context.window} count: $count")
+ }
+}
{% endhighlight %}
</div>
</div>
-### WindowFunction with Incremental Aggregation
+The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output.
+
+<span class="label label-danger">Attention</span> Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`.
-A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
+### ProcessWindowFunction with Incremental Aggregation
+
+A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
incrementally aggregate elements as they arrive in the window.
-When the window is closed, the `WindowFunction` will be provided with the aggregated result.
+When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result.
This allows to incrementally compute windows while having access to the
-additional window meta information of the `WindowFunction`.
+additional window meta information of the `ProcessWindowFunction`.
-<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of
-`WindowFunction` for incremental window aggregation.
+<span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of
+`ProcessWindowFunction` for incremental window aggregation.
#### Incremental Window Aggregation with FoldFunction
The following example shows how an incremental `FoldFunction` can be combined with
-a `WindowFunction` to extract the number of events in the window and return also
+a `ProcessWindowFunction` to extract the number of events in the window and return also
the key and end time of the window.
<div class="codetabs" markdown="1">
@@ -692,7 +619,7 @@ DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
- .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
+ .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
@@ -706,15 +633,15 @@ private static class MyFoldFunction
}
}
-private static class MyWindowFunction
- implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
+private static class MyProcessWindowFunction
+ implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
- public void apply(String key,
- TimeWindow window,
+ public void process(String key,
+ Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
- out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
+ out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
@@ -759,7 +686,7 @@ DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
- .reduce(new MyReduceFunction(), new MyWindowFunction());
+ .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
@@ -770,11 +697,11 @@ private static class MyReduceFunction implements ReduceFunction<SensorReading> {
}
}
-private static class MyWindowFunction
- implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
+private static class MyProcessWindowFunction
+ implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void apply(String key,
- TimeWindow window,
+ Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
@@ -808,6 +735,80 @@ input
</div>
</div>
+### WindowFunction (Legacy)
+
+In some places where a `ProcessWindowFunction` can be used you can also use a `WindowFunction`. This
+is an older version of `ProcessWindowFunction` that provides less contextual information and does
+not have some advances features, such as per-window keyed state. This interface will be deprecated
+at some point.
+
+The signature of a `WindowFunction` looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param window The window that is being evaluated.
+ * @param input The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param window The window that is being evaluated.
+ * @param input The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
+}
+{% endhighlight %}
+</div>
+</div>
+
+It can be used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .apply(new MyWindowFunction());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .apply(new MyWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
## Triggers
A `Trigger` determines when a window (as formed by the *window assigner*) is ready to be