You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:40 UTC

[beam] 02/07: Fix comment about schemas

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8d05d46f4b2b116dd3bed21566ede230720ccc0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jun 11 14:40:54 2019 +0200

    Fix comment about schemas
---
 .../batch/AggregatorCombinerGlobally.java          |  2 +-
 .../translation/helpers/ReduceFnRunnerHelpers.java | 77 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index a03c17e..2f8293b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -43,7 +43,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
   @Override
   public AccumT reduce(AccumT accumulator, InputT input) {
     // because of generic type InputT, spark cannot infer an input type.
-    // it would pass Integer as input if we had a Aggregator<Input, ..., ...>
+    // it would pass Integer as input if we had a Aggregator<Integer, ..., ...>
     // without the type inference it stores input in a GenericRowWithSchema
     Row row = (Row) input;
     InputT t = RowHelpers.extractObjectFromRow(row);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
new file mode 100644
index 0000000..97a225e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
@@ -0,0 +1,77 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Helpers to use {@link ReduceFnRunner}.
+ */
+public class ReduceFnRunnerHelpers<K, InputT, W extends BoundedWindow> {
+  public static <K, InputT, W extends BoundedWindow> void fireEligibleTimers(
+      InMemoryTimerInternals timerInternals,
+      ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
+      throws Exception {
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
+    }
+  }
+
+  /**
+   * {@link OutputWindowedValue} for ReduceFnRunner.
+   *
+   */
+  public static class GABWOutputWindowedValue<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
+    private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>();
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    @Override
+    public <AdditionalOutputT> void outputWindowedValue(
+        TupleTag<AdditionalOutputT> tag,
+        AdditionalOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
+    }
+
+    public Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+      return outputs;
+    }
+  }
+
+}