You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:20 UTC
[beam] 05/06: re-enable reduceFnRunner timers for output
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit c9474558092611116f308d7b824ee0bb5c11ecb7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 28 09:31:28 2019 +0200
re-enable reduceFnRunner timers for output
---
.../batch/functions/GroupAlsoByWindowViaOutputBufferFn.java | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
index 2fb08f5..cc65716 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
@@ -100,21 +101,18 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind
// Finish any pending windows by advancing the input watermark to infinity.
timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
- // not supported yet
-/*
// Finally, advance the processing time to infinity to fire any timers.
timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
fireEligibleTimers(timerInternals, reduceFnRunner);
-*/
reduceFnRunner.persist();
return outputter.getOutputs().iterator();
}
-/* private void fireEligibleTimers(
+ private void fireEligibleTimers(
InMemoryTimerInternals timerInternals,
ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
throws Exception {
@@ -136,7 +134,7 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind
reduceFnRunner.onTimers(timers);
timers.clear();
}
- }*/
+ }
private static class GABWOutputWindowedValue<K, V>
implements OutputWindowedValue<KV<K, Iterable<V>>> {