You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by danielblazevski <gi...@git.apache.org> on 2016/08/13 23:53:32 UTC

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

GitHub user danielblazevski opened a pull request:

    https://github.com/apache/flink/pull/2368

    [FLINK-3899] Document window processing with Reduce/FoldFunction + WindowFunction

    Added example of using Reduce/Fold + Window in docs.  The examples were tested to run -- I used the sensor reading class from dataArtisans/blogposts.  

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/danielblazevski/flink FLINK-3899

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2368
    
----
commit 3b7466b161eecfdb7b862dafce0ed4fac2248ecd
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T22:57:18Z

    added example of fold/window and reduce/window

commit 9dfc44f2867ca1d5f92beb625f264c0d92632784
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T22:58:19Z

    added example of fold/window and reduce/window

commit 44553dbfc97e04e3f36eaec9a2e24a08e886e26f
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:07:29Z

    removed throws exception

commit c121b51a08a0d4c73eb703fa68e17b90b8d5fc17
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:11:30Z

    changed String var to  in window fns

commit 747e989001651a31194a9d6f3071f57518c0c03c
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:14:48Z

    changed start time to end time typo

commit 9669866440b55740eecc16250b6cfb48636e7ad7
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:16:29Z

    changed  to  typo

commit d72eb69806f89b9089731edbb8c7d7d630e4877e
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:41:09Z

    added scala example

commit a8ccb49928edf9b0b37acb67439519f82e412988
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:46:19Z

    format: added empty line after each class def for java

commit 071b706f0b9d52d4232c20f68a87aa5514a0a40d
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:49:02Z

    changed names of iterable inputs

commit a55305f9b57d812b81b6cd6849b658f5c17344f8
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:50:53Z

    changed MIN_VALUE to MinValue for Scala

commit d404568663d375acfa2e9e48b79f775f4e2bb37c
Author: danielblazevski <da...@gmail.com>
Date:   2016-08-13T23:52:41Z

    removed semicolon in Scala example

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r76120063
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    Ah, lol, `Int` vs `Integer`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r76008397
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    The example looks good, thanks! Two minor suggestions: 1) I think we can omit setting key and time in the `FoldFunction`, 2) the `WindowFunction` could fetch the count in a separate variable. This would make the `out.collect` line a bit shorter.
    
    Regarding the type restriction: You discovered a bug that we would like to fix but can't until Flink 2.0.0 because we promoted the interface to be `@Public` and the API is stable in Flink 1.0 releases. IMO it makes sense to point out this accidental restriction in the documentation.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75894336
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    Not sure if this is a good example. The same result could be achieved by a single `FoldFunction`. How about the `FoldFunction` counts the number of records and the `WindowFunction` emits a `Tuple3` of key, end time of window, and count?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2368
  
    Thanks for the fast update @danielblazevski. 
    I think you accidentally added `tools/FlinkCodyStyle.xml` to your commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2368


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75973261
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    @fhueske does this look OK for this case?  If so, I'll finish things up by adding the Reduce example and add both corresponding Scala examples
    ```java
    // for folding incremental computation
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
    
    /* ... */
    
    private static class MyFoldFunction implements FoldFunction<SensorReading,
            Tuple3<String, Long, Integer> > {
    
        public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
            Integer cur = acc.getField(2);
            return new Tuple3<String, Long, Integer> (acc.getField(0), acc.getField(1), cur + 1);
        }
    }
    
    private static class MyWindowFunction implements WindowFunction<Tuple3<String, Long, Integer>,
            Tuple3<String, Long, Integer>, String, TimeWindow> {
        public void apply(String s,
                          TimeWindow window,
                          Iterable<Tuple3<String, Long, Integer>> counts,
                          Collector<Tuple3<String, Long, Integer>> out) {
            out.collect(new Tuple3<String, Long, Integer>(s, window.getEnd(),
                    counts.iterator().next().getField(2));
        }
    }
    ```
    
    I found that I had to have the `FoldFunction` include `Tuple3` in its signature since the `WindowFunction` must be of the form `WindowFunction<ACC, ACC, K, W>` according to [here](https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75895039
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    +        }
    +}
     
     // for reducing incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
         .apply(new MyReduceFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myReduceFunction implements ReduceFunction<SensorReading> {
    +
    +    public SensorReading reduce(SensorReading s1, SensorReading s2)  {
    +        return s1;
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<SensorReading, SensorReading, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<SensorReading> readings, Collector<SensorReading> out) {
    +        out.collect(readings.iterator().next());
    --- End diff --
    
    Not sure if this is a good example. The same result could be achieved by a single `ReduceFunction`. How about the `ReduceFunction` searches for a `SensorReading` with a minimum value and the `WindowFunction` emits a `Tuple2` of start time of window and minimum `SensorReading`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on the issue:

    https://github.com/apache/flink/pull/2368
  
    Sorry, indeed, added that when playing around that the open PR to add `FlinkCodyStyle.xml`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75894020
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    +        }
    +}
     
     // for reducing incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
         .apply(new MyReduceFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myReduceFunction implements ReduceFunction<SensorReading> {
    --- End diff --
    
    Please remove double space & fix lowercase class name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on the issue:

    https://github.com/apache/flink/pull/2368
  
    Pushed the Scala example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75958831
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    Can the the latter also be done using a single `FoldFunction`?  E.g
    
    ```java
    public  Tuple3<String,Long, Integer> fold(Tuple3<String, Long, Integer> acc,
                                                SensorReading s){
         Integer cur = acc.getField(2);
    
         return new Tuple3<String,Long, Integer>(
                 s.sensorId(),
                 Math.max(acc.getField(1), s.timestamp()),
                 cur + 1);
     }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r76115561
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    Thanks for the update. 
    The following Scala code does not show an error in my IntelliJ:
    
    ```
    val readings: DataStream[SensorReading] = ???
    
    val result: DataStream[(String, Long, Int)] = readings
          .keyBy(_.sensorId)
          .timeWindow(Time.minutes(1), Time.seconds(10))
          .apply(
            ("", 0L, 0),
            (acc: (String, Long, Int), r: SensorReading) => {
              ("", 0L, acc._3 + 1)
            },
            (k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)], out: Collector[(String, Long, Int)]) => {
              val cnt = cnts.iterator.next()
              out.collect((k, w.getEnd, cnt._3))
            }
          )
    ```
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75906795
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75906787
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    +        }
    +}
     
     // for reducing incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
         .apply(new MyReduceFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myReduceFunction implements ReduceFunction<SensorReading> {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75893984
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    --- End diff --
    
    Please remove double space & fix lowercase class name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2368
  
    Hi @danielblazevski, thanks for the PR. I added a some comments and suggestions.
    
    Thank you, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2368
  
    Thanks for the update! The example looks good. I'll do some minor reformatting and merge the PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r76102678
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    Made the changes in the Java version and added the comments.  Had some issues with the Scala version.  See screenshots, the only change is really to change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was saying has to have type `SensorReadng`, which is not ideal.  I removed the Scala version for now.  
    
    <img width="426" alt="screenshot 2016-08-24 13 28 12" src="https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png">
    
    <img width="625" alt="screenshot 2016-08-24 13 27 51" src="https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png">
      


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2368
  
    No worries :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by danielblazevski <gi...@git.apache.org>.
Github user danielblazevski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75961552
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    Oh gotcha, different notion of end time.  Makes sense now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75959873
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    --- End diff --
    
    `Math.max(acc.getField(1), s.timestamp())` will give you the timestamp of the last element that was added to the window (assuming they arrive in event-time). With end time of the window I meant the time stamp after which an element would be placed in the next window (for an hourly tumbling window this would be `00:59:59.999` for the window from `00:00:00:000` to  `00:59:59.999`). This information is only available in a `WindowFunction` through the `TimeWindow` object. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---