You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:20 UTC

[beam] 05/06: re-enable reduceFnRunner timers for output

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c9474558092611116f308d7b824ee0bb5c11ecb7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 28 09:31:28 2019 +0200

    re-enable reduceFnRunner timers for output
---
 .../batch/functions/GroupAlsoByWindowViaOutputBufferFn.java       | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
index 2fb08f5..cc65716 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
@@ -100,21 +101,18 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind
     // Finish any pending windows by advancing the input watermark to infinity.
     timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    // not supported yet
-/*
     // Finally, advance the processing time to infinity to fire any timers.
     timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     fireEligibleTimers(timerInternals, reduceFnRunner);
-*/
 
     reduceFnRunner.persist();
 
     return outputter.getOutputs().iterator();
   }
 
-/*  private void fireEligibleTimers(
+  private void fireEligibleTimers(
       InMemoryTimerInternals timerInternals,
       ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
       throws Exception {
@@ -136,7 +134,7 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind
       reduceFnRunner.onTimers(timers);
       timers.clear();
     }
-  }*/
+  }
 
   private static class GABWOutputWindowedValue<K, V>
       implements OutputWindowedValue<KV<K, Iterable<V>>> {