You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:40 UTC
[beam] 02/07: Fix comment about schemas
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8d05d46f4b2b116dd3bed21566ede230720ccc0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jun 11 14:40:54 2019 +0200
Fix comment about schemas
---
.../batch/AggregatorCombinerGlobally.java | 2 +-
.../translation/helpers/ReduceFnRunnerHelpers.java | 77 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index a03c17e..2f8293b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -43,7 +43,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
@Override
public AccumT reduce(AccumT accumulator, InputT input) {
// because of generic type InputT, spark cannot infer an input type.
- // it would pass Integer as input if we had a Aggregator<Input, ..., ...>
+ // it would pass Integer as input if we had a Aggregator<Integer, ..., ...>
// without the type inference it stores input in a GenericRowWithSchema
Row row = (Row) input;
InputT t = RowHelpers.extractObjectFromRow(row);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
new file mode 100644
index 0000000..97a225e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
@@ -0,0 +1,77 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Helpers to use {@link ReduceFnRunner}.
+ */
+public class ReduceFnRunnerHelpers<K, InputT, W extends BoundedWindow> {
+ public static <K, InputT, W extends BoundedWindow> void fireEligibleTimers(
+ InMemoryTimerInternals timerInternals,
+ ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
+ throws Exception {
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ while (true) {
+ TimerInternals.TimerData timer;
+ while ((timer = timerInternals.removeNextEventTimer()) != null) {
+ timers.add(timer);
+ }
+ while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ if (timers.isEmpty()) {
+ break;
+ }
+ reduceFnRunner.onTimers(timers);
+ timers.clear();
+ }
+ }
+
+ /**
+ * {@link OutputWindowedValue} for ReduceFnRunner.
+ *
+ */
+ public static class GABWOutputWindowedValue<K, V>
+ implements OutputWindowedValue<KV<K, Iterable<V>>> {
+ private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>();
+
+ @Override
+ public void outputWindowedValue(
+ KV<K, Iterable<V>> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
+ }
+
+ public Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+ return outputs;
+ }
+ }
+
+}