You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/16 11:32:17 UTC

[1/2] flink git commit: [hotfix] Change order of reduce and fold in window documentation

Repository: flink
Updated Branches:
  refs/heads/master 0214e8003 -> 404e37d21


[hotfix] Change order of reduce and fold in window documentation

In the section about incremental aggregation with a window function the
order of fold and reduce was different from the order of fold and reduce
in the rest of the documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa2f92c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa2f92c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa2f92c7

Branch: refs/heads/master
Commit: aa2f92c73d2b2bfbd57f341597407ebcb44ca174
Parents: 0214e80
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 16 13:25:15 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 16 13:25:15 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 112 +++++++++++++++---------------
 1 file changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa2f92c7/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 0ecae0c..79a2dd6 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -506,13 +506,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl
    	     * Returns the window that is being evaluated.
    	     */
    	    public abstract W window();
-   
+
    	    /** Returns the current processing time. */
    	    public abstract long currentProcessingTime();
-   
+
    	    /** Returns the current event-time watermark. */
    	    public abstract long currentWatermark();
-   
+
    	    /**
    	     * State accessor for per-key and per-window state.
    	     *
@@ -520,7 +520,7 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl
    	     * by implementing {@link ProcessWindowFunction#clear(Context)}.
    	     */
    	    public abstract KeyedStateStore windowState();
-   
+
    	    /**
    	     * State accessor for per-key global state.
    	     */
@@ -558,22 +558,22 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
       * Returns the window that is being evaluated.
       */
     def window: W
-  
+
     /**
       * Returns the current processing time.
       */
     def currentProcessingTime: Long
-  
+
     /**
       * Returns the current event-time watermark.
       */
     def currentWatermark: Long
-  
+
     /**
       * State accessor for per-key and per-window state.
       */
     def windowState: KeyedStateStore
-  
+
     /**
       * State accessor for per-key global state.
       */
@@ -658,11 +658,11 @@ additional window meta information of the `ProcessWindowFunction`.
 <span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of
 `ProcessWindowFunction` for incremental window aggregation.
 
-#### Incremental Window Aggregation with FoldFunction
+#### Incremental Window Aggregation with ReduceFunction
 
-The following example shows how an incremental `FoldFunction` can be combined with
-a `ProcessWindowFunction` to extract the number of events in the window and return also
-the key and end time of the window.
+The following example shows how an incremental `ReduceFunction` can be combined with
+a `WindowFunction` to return the smallest event in a window along
+with the start time of the window.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -672,29 +672,26 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
+  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
 
 // Function definitions
 
-private static class MyFoldFunction
-    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
+private static class MyReduceFunction implements ReduceFunction<SensorReading> {
 
-  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
-      Integer cur = acc.getField(2);
-      acc.setField(2, cur + 1);
-      return acc;
+  public SensorReading reduce(SensorReading r1, SensorReading r2) {
+      return r1.value() > r2.value() ? r2 : r1;
   }
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
+    implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
 
-  public void process(String key,
+  public void apply(String key,
                     Context context,
-                    Iterable<Tuple3<String, Long, Integer>> counts,
-                    Collector<Tuple3<String, Long, Integer>> out) {
-    Integer count = counts.iterator().next().getField(2);
-    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
+                    Iterable<SensorReading> minReadings,
+                    Collector<Tuple2<Long, SensorReading>> out) {
+      SensorReading min = minReadings.iterator().next();
+      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
   }
 }
 
@@ -706,18 +703,17 @@ private static class MyProcessWindowFunction
 val input: DataStream[SensorReading] = ...
 
 input
- .keyBy(<key selector>)
- .timeWindow(<window assigner>)
- .fold (
-    ("", 0L, 0),
-    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .reduce(
+    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
     ( key: String,
       window: TimeWindow,
-      counts: Iterable[(String, Long, Int)],
-      out: Collector[(String, Long, Int)] ) =>
+      minReadings: Iterable[SensorReading],
+      out: Collector[(Long, SensorReading)] ) =>
       {
-        val count = counts.iterator.next()
-        out.collect((key, window.getEnd, count._3))
+        val min = minReadings.iterator.next()
+        out.collect((window.getStart, min))
       }
   )
 
@@ -725,11 +721,11 @@ input
 </div>
 </div>
 
-#### Incremental Window Aggregation with ReduceFunction
+#### Incremental Window Aggregation with FoldFunction
 
-The following example shows how an incremental `ReduceFunction` can be combined with
-a `WindowFunction` to return the smallest event in a window along
-with the start time of the window.
+The following example shows how an incremental `FoldFunction` can be combined with
+a `ProcessWindowFunction` to extract the number of events in the window and return also
+the key and end time of the window.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -739,26 +735,29 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
+  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
 
 // Function definitions
 
-private static class MyReduceFunction implements ReduceFunction<SensorReading> {
+private static class MyFoldFunction
+    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
 
-  public SensorReading reduce(SensorReading r1, SensorReading r2) {
-      return r1.value() > r2.value() ? r2 : r1;
+  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
+      Integer cur = acc.getField(2);
+      acc.setField(2, cur + 1);
+      return acc;
   }
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
+    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
 
-  public void apply(String key,
+  public void process(String key,
                     Context context,
-                    Iterable<SensorReading> minReadings,
-                    Collector<Tuple2<Long, SensorReading>> out) {
-      SensorReading min = minReadings.iterator().next();
-      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
+                    Iterable<Tuple3<String, Long, Integer>> counts,
+                    Collector<Tuple3<String, Long, Integer>> out) {
+    Integer count = counts.iterator().next().getField(2);
+    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
   }
 }
 
@@ -770,17 +769,18 @@ private static class MyProcessWindowFunction
 val input: DataStream[SensorReading] = ...
 
 input
-  .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
-  .reduce(
-    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
+ .keyBy(<key selector>)
+ .timeWindow(<window assigner>)
+ .fold (
+    ("", 0L, 0),
+    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
     ( key: String,
       window: TimeWindow,
-      minReadings: Iterable[SensorReading],
-      out: Collector[(Long, SensorReading)] ) =>
+      counts: Iterable[(String, Long, Int)],
+      out: Collector[(String, Long, Int)] ) =>
       {
-        val min = minReadings.iterator.next()
-        out.collect((window.getStart, min))
+        val count = counts.iterator.next()
+        out.collect((key, window.getEnd, count._3))
       }
   )
 


[2/2] flink git commit: [hotfix] Change WindowFunction to ProcessWindowFunction in window doc

Posted by al...@apache.org.
[hotfix] Change WindowFunction to ProcessWindowFunction in window doc

There were some places where the doc still referred to WindowFunction
where ProcessWindowFunction should be used instead now.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/404e37d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/404e37d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/404e37d2

Branch: refs/heads/master
Commit: 404e37d21c2a0ca0c9225e2be894f97500668fad
Parents: aa2f92c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 16 13:31:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 16 13:31:26 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/404e37d2/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 79a2dd6..52c4b47 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -70,7 +70,7 @@ lateness of 1 min, Flink will create a new window for the interval between `12:0
 a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the `12:06`
 timestamp.
 
-In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function (`WindowFunction`, `ReduceFunction` or
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function (`ProcessWindowFunction`, `ReduceFunction` or
 `FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function will contain the computation to
 be applied to the contents of the window, while the `Trigger` specifies the conditions under which the window is
 considered ready for the function to be applied. A triggering policy might be something like "when the number of elements
@@ -333,7 +333,7 @@ they are  evaluated differently than tumbling and sliding windows. Internally, a
 creates a new window for each arriving record and merges windows together if their are closer to each other
 than the defined gap.
 In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
-[Window Function](#window-functions), such as `ReduceFunction` or `WindowFunction`
+[Window Function](#window-functions), such as `ReduceFunction` or `ProcessWindowFunction`
 (`FoldFunction` cannot merge.)
 
 ### Global Windows
@@ -378,16 +378,16 @@ to perform on each of these windows. This is the responsibility of the *window f
 elements of each (possibly keyed) window once the system determines that a window is ready for processing
 (see [triggers](#triggers) for how Flink determines when a window is ready).
 
-The window function can be one of `ReduceFunction`, `FoldFunction` or `WindowFunction`. The first
+The window function can be one of `ReduceFunction`, `FoldFunction` or `ProcessWindowFunction`. The first
 two can be executed more efficiently (see [State Size](#state size) section) because Flink can incrementally aggregate
-the elements for each window as they arrive. A `WindowFunction` gets an `Iterable` for all the elements contained in a
+the elements for each window as they arrive. A `ProcessWindowFunction` gets an `Iterable` for all the elements contained in a
 window and additional meta information about the window to which the elements belong.
 
-A windowed transformation with a `WindowFunction` cannot be executed as efficiently as the other
+A windowed transformation with a `ProcessWindowFunction` cannot be executed as efficiently as the other
 cases because Flink has to buffer *all* elements for a window internally before invoking the function.
-This can be mitigated by combining a `WindowFunction` with a `ReduceFunction` or `FoldFunction` to
+This can be mitigated by combining a `ProcessWindowFunction` with a `ReduceFunction` or `FoldFunction` to
 get both incremental aggregation of window elements and the additional window metadata that the
-`WindowFunction` receives. We will look at examples for each of these variants.
+`ProcessWindowFunction` receives. We will look at examples for each of these variants.
 
 ### ReduceFunction
 
@@ -629,7 +629,7 @@ input
 
 /* ... */
 
-class MyWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
+class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
 
   def apply(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
     var count = 0L
@@ -661,7 +661,7 @@ additional window meta information of the `ProcessWindowFunction`.
 #### Incremental Window Aggregation with ReduceFunction
 
 The following example shows how an incremental `ReduceFunction` can be combined with
-a `WindowFunction` to return the smallest event in a window along
+a `ProcessWindowFunction` to return the smallest event in a window along
 with the start time of the window.
 
 <div class="codetabs" markdown="1">
@@ -890,8 +890,8 @@ Two things to notice about the above methods are:
 ### Fire and Purge
 
 Once a trigger determines that a window is ready for processing, it fires, *i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the window operator
-to emit the result of the current window. Given a window with a `WindowFunction`
-all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
+to emit the result of the current window. Given a window with a `ProcessWindowFunction`
+all elements are passed to the `ProcessWindowFunction` (possibly after passing them to an evictor).
 Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.
 
 When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
@@ -1162,7 +1162,7 @@ Windows can be defined over long periods of time (such as days, weeks, or months
 
 1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the [Window Assigners](#window-assigners) section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
 
-2. `FoldFunction` and `ReduceFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a `WindowFunction` requires accumulating all elements.
+2. `FoldFunction` and `ReduceFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a `ProcessWindowFunction` requires accumulating all elements.
 
 3. Using an `Evictor` prevents any pre-aggregation, as all the elements of a window have to be passed through the evictor before applying the computation (see [Evictors](#evictors)).