You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/30 05:46:16 UTC

[GitHub] taegeonum commented on a change in pull request #135: [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform

taegeonum commented on a change in pull request #135: [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform
URL: https://github.com/apache/incubator-nemo/pull/135#discussion_r229182146
 
 

 ##########
 File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##########
 @@ -76,30 +77,59 @@ public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputC
    */
   @Override
   protected DoFn wrapDoFn(final DoFn doFn) {
-    timerInternalsFactory = new InMemoryTimerInternalsFactory();
+    this.stateAndTimerInternalsFactory = new StateAndTimerInternalsFactory();
     // This function performs group by key and window operation
     return
       GroupAlsoByWindowViaWindowSetNewDoFn.create(
         getWindowingStrategy(),
-        new InMemoryStateInternalsFactory(),
-        timerInternalsFactory,
+        stateAndTimerInternalsFactory.inMemoryStateInternalsFactory,
+        stateAndTimerInternalsFactory.inMemoryTimerInternalsFactory,
         getSideInputReader(),
         reduceFn,
         getOutputManager(),
         getMainOutputTag());
   }
 
+  /**
+   * It collects data for each key.
+   * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
+   * @param element data element
+   */
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
     final KV<K, InputT> kv = element.getValue();
-    keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+    keyToValues.putIfAbsent(kv.getKey(), new LinkedList<>());
     keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
   }
 
+  /**
+   * Process the collected data and trigger timers.
+   * @param watermark current watermark
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   */
+  private void processElementsAndTriggerTimers(final Watermark watermark,
+                                               final Instant processingTime,
+                                               final Instant synchronizedTime) {
+    keyToValues.forEach((key, val) -> {
+      // for each key
+      // Process elements
+      if (!val.isEmpty()) {
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(key, val);
+        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
 
 Review comment:
   The DoFnRunner interface requires WindowedValue, but this windowed value is actually not used in the ReduceFnRunner internal.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services