You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/08 08:24:00 UTC

[jira] [Work logged] (BEAM-4643) Allow to check early panes of a window

     [ https://issues.apache.org/jira/browse/BEAM-4643?focusedWorklogId=152158&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152158 ]

ASF GitHub Bot logged work on BEAM-4643:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/18 08:23
            Start Date: 08/Oct/18 08:23
    Worklog Time Spent: 10m 
      Work Description: lhauspie commented on issue #5811: [BEAM-4643] Allow to check early panes of a window
URL: https://github.com/apache/beam/pull/5811#issuecomment-427754675
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 152158)
            Time Spent: 1h 20m  (was: 1h 10m)
    Remaining Estimate: 22h 40m  (was: 22h 50m)

> Allow to check early panes of a window
> --------------------------------------
>
>                 Key: BEAM-4643
>                 URL: https://issues.apache.org/jira/browse/BEAM-4643
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, testing
>    Affects Versions: 2.5.0
>            Reporter: Logan HAUSPIE
>            Assignee: Logan HAUSPIE
>            Priority: Minor
>   Original Estimate: 24h
>          Time Spent: 1h 20m
>  Remaining Estimate: 22h 40m
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
>  {{    .inEarlyPanes(intervalWindow(05, 20))}}
>  {{        .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // Window triggered 2 times earlier (black, 1) + (black, 1)}}
>  {{    .inOnTimePane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by reach the watermark (no additionnal data)}}
>  {{    .inFinalPane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 minutes
>  
> The workaround I found is to filter the PCollection to keep only the EARLY elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values, PaneInfo.Timing timing) {}}
> {{  PCollection<T> filtered = values}}
> {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{                  @ProcessElement}}
> {{                  public void processElement(ProcessContext c, BoundedWindow window) {}}
> {{                    c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()), c.timestamp());}}
> {{                    }}}
> {{              }}}
> {{          )}}
> {{      )}}
>  {{      .setCoder(}}
>  {{          ValueInSingleWindow.Coder.of(}}
>  {{              values.getCoder(), values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{          )}}
> {{      )}}
> {{      .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{      .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{                @ProcessElement}}
> {{                public void processElement(ProcessContext c, BoundedWindow window) {}}
> {{                   c.outputWithTimestamp(c.element().getValue(), c.timestamp());}}
> {{                }}}
> {{          }}}
> {{      ));}}
> {{  return filtered;}}
> {{ }}}
>  
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
>  {{   .inWindow(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
>  {{PAssert.that(teamScores)}}
>  {{   .inOnTimePane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 2))}}
>  {{   .inFinalPane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>  
> But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)