You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:38 UTC

[beam] branch spark-runner_structured-streaming updated (8cdd143 -> f0522dc)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 8cdd143  Add TODO in Combine translations
     new 8a4372d  Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK
     new 8d05d46  Fix comment about schemas
     new bba08b4  Implement reduce part of CombineGlobally translation with windowing
     new 4602f83  Output data after combine
     new 8f8bae4  Implement merge accumulators part of CombineGlobally translation with windowing
     new 9a269ef  Fix encoder in combine call
     new f0522dc  [to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../batch/AggregatorCombinerGlobally.java          | 205 ++++++++++++++++++---
 .../batch/CombineGloballyTranslatorBatch.java      |  22 +--
 .../batch/CombinePerKeyTranslatorBatch.java        |  10 +-
 .../batch/GroupByKeyTranslatorBatch.java           |   4 +-
 .../translation/helpers/KVHelpers.java             |   5 +-
 .../translation/helpers/ReduceFnRunnerHelpers.java |  77 ++++++++
 6 files changed, 269 insertions(+), 54 deletions(-)
 create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java


[beam] 03/07: Implement reduce part of CombineGlobally translation with windowing

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit bba08b4201a1dc53ae117d9bd495b117923e189d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jun 13 11:23:52 2019 +0200

    Implement reduce part of CombineGlobally translation with windowing
---
 .../batch/AggregatorCombinerGlobally.java          | 165 +++++++++++++++++----
 .../batch/CombineGloballyTranslatorBatch.java      |  19 +--
 2 files changed, 144 insertions(+), 40 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 2f8293b..0d13218 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -18,60 +18,173 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.expressions.Aggregator;
+import org.joda.time.Instant;
+import scala.Tuple2;
 
-/** An {@link Aggregator} for the Spark Batch Runner. */
-class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
-    extends Aggregator<InputT, AccumT, OutputT> {
+/** An {@link Aggregator} for the Spark Batch Runner. It does not use ReduceFnRunner
+ * for windowMerging, because reduceFnRunner is based on state which requires a keyed collection.
+ * The accumulator is a {@code Iterable<WindowedValue<AccumT>> because an {@code InputT} can be in multiple windows. So, when accumulating {@code InputT} values, we create one accumulator per input window.
+ * */
+
+class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
+    extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>, WindowedValue<OutputT>> {
 
   private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+  private WindowingStrategy<InputT, W> windowingStrategy;
+  private TimestampCombiner timestampCombiner;
 
-  public AggregatorCombinerGlobally(Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+  public AggregatorCombinerGlobally(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, WindowingStrategy<?, ?> windowingStrategy) {
     this.combineFn = combineFn;
+    this.windowingStrategy = (WindowingStrategy<InputT, W>) windowingStrategy;
+    this.timestampCombiner = windowingStrategy.getTimestampCombiner();
   }
 
-  @Override
-  public AccumT zero() {
-    return combineFn.createAccumulator();
+  @Override public Iterable<WindowedValue<AccumT>> zero() {
+    return new ArrayList<>();
   }
 
-  @Override
-  public AccumT reduce(AccumT accumulator, InputT input) {
-    // because of generic type InputT, spark cannot infer an input type.
-    // it would pass Integer as input if we had a Aggregator<Integer, ..., ...>
-    // without the type inference it stores input in a GenericRowWithSchema
-    Row row = (Row) input;
-    InputT t = RowHelpers.extractObjectFromRow(row);
-    return combineFn.addInput(accumulator, t);
+  @Override public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>> accumulators,
+      WindowedValue<InputT> input) {
+
+    //concatenate accumulators windows and input windows and merge the windows
+    Collection<W> inputWindows = (Collection<W>)input.getWindows();
+    Set<W> windows = collectAccumulatorsWindows(accumulators);
+    windows.addAll(inputWindows);
+    Map<W, W> windowToMergeResult = null;
+    try {
+      windowToMergeResult = mergeWindows(windowingStrategy, windows);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to merge accumulators windows and input windows", e);
+    }
+
+    // iterate through the input windows and for each, create an accumulator with the merged window
+    // associated to it and call addInput with the accumulator.
+    // Maintain a map of the accumulators for use as output
+    Map<W, Tuple2<AccumT, Instant>> mapState = new HashMap<>();
+    for (W inputWindow:inputWindows) {
+      W mergedWindow = windowToMergeResult.get(inputWindow);
+      mergedWindow = mergedWindow == null ? inputWindow : mergedWindow;
+      Tuple2<AccumT, Instant> accumAndInstant = mapState.get(mergedWindow);
+      // if there is no accumulator associated with this window yet, create one
+      if (accumAndInstant == null) {
+        AccumT accum = combineFn.addInput(combineFn.createAccumulator(), input.getValue());
+        Instant windowTimestamp =
+            timestampCombiner.assign(
+                mergedWindow, windowingStrategy.getWindowFn().getOutputTime(input.getTimestamp(), mergedWindow));
+        accumAndInstant = new Tuple2<>(accum, windowTimestamp);
+        mapState.put(mergedWindow, accumAndInstant);
+      } else {
+        AccumT updatedAccum =
+            combineFn.addInput(accumAndInstant._1, input.getValue());
+        Instant updatedTimestamp = timestampCombiner.combine(accumAndInstant._2, timestampCombiner
+            .assign(mergedWindow,
+                windowingStrategy.getWindowFn().getOutputTime(input.getTimestamp(), mergedWindow)));
+        accumAndInstant = new Tuple2<>(updatedAccum, updatedTimestamp);
+      }
+    }
+    // output the accumulators map
+    List<WindowedValue<AccumT>> result = new ArrayList<>();
+    for (Map.Entry<W, Tuple2<AccumT, Instant>> entry : mapState.entrySet()) {
+      AccumT accumulator = entry.getValue()._1;
+      Instant windowTimestamp = entry.getValue()._2;
+      W window = entry.getKey();
+      result.add(WindowedValue.of(accumulator, windowTimestamp, window, PaneInfo.NO_FIRING));
+    }
+    return result;
   }
 
-  @Override
-  public AccumT merge(AccumT accumulator1, AccumT accumulator2) {
+  @Override public Iterable<WindowedValue<AccumT>> merge(
+      Iterable<WindowedValue<AccumT>> accumulators1,
+      Iterable<WindowedValue<AccumT>> accumulators2) {
+    // TODO
+    /*
     ArrayList<AccumT> accumulators = new ArrayList<>();
     accumulators.add(accumulator1);
     accumulators.add(accumulator2);
     return combineFn.mergeAccumulators(accumulators);
+*/
+    return null;
   }
 
-  @Override
-  public OutputT finish(AccumT reduction) {
-    return combineFn.extractOutput(reduction);
+  @Override public WindowedValue<OutputT> finish(Iterable<WindowedValue<AccumT>> reduction) {
+    // TODO
+    //    return combineFn.extractOutput(reduction);
+    return null;
   }
 
-  @Override
-  public Encoder<AccumT> bufferEncoder() {
+  @Override public Encoder<Iterable<WindowedValue<AccumT>>> bufferEncoder() {
     // TODO replace with accumulatorCoder if possible
     return EncoderHelpers.genericEncoder();
   }
 
-  @Override
-  public Encoder<OutputT> outputEncoder() {
+  @Override public Encoder<WindowedValue<OutputT>> outputEncoder() {
     // TODO replace with outputCoder if possible
     return EncoderHelpers.genericEncoder();
   }
+
+  private Set<W> collectAccumulatorsWindows(Iterable<WindowedValue<AccumT>> accumulators) {
+    Set<W> windows = new HashSet<>();
+    for (WindowedValue<?> accumulator : accumulators) {
+      // an accumulator has only one window associated to it.
+      W accumulatorWindow = (W) accumulator.getWindows().iterator().next();
+      windows.add(accumulatorWindow);
+    } return windows;
+  }
+
+  private Map<W, W> mergeWindows(WindowingStrategy<InputT, W> windowingStrategy, Set<W> windows)
+      throws Exception {
+    WindowFn<InputT, W> windowFn = windowingStrategy.getWindowFn();
+
+    if (windowingStrategy.getWindowFn().isNonMerging()) {
+      // Return an empty map, indicating that every window is not merged.
+      return Collections.emptyMap();
+    }
+
+    Map<W, W> windowToMergeResult = new HashMap<>();
+    windowFn.mergeWindows(new MergeContextImpl(windowFn, windows, windowToMergeResult));
+    return windowToMergeResult;
+  }
+
+
+  private class MergeContextImpl extends WindowFn<InputT, W>.MergeContext {
+
+    private Set<W> windows;
+    private Map<W, W> windowToMergeResult;
+
+    MergeContextImpl(WindowFn<InputT, W> windowFn, Set<W> windows, Map<W, W> windowToMergeResult) {
+      windowFn.super();
+      this.windows = windows;
+      this.windowToMergeResult = windowToMergeResult;
+    }
+
+    @Override
+    public Collection<W> windows() {
+      return windows;
+    }
+
+    @Override
+    public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
+      for (W w : toBeMerged) {
+        windowToMergeResult.put(w, mergeResult);
+      }
+    }
+  }
+
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index 53651cf..f18572b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -50,25 +51,15 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
     @SuppressWarnings("unchecked")
     final Combine.CombineFn<InputT, AccumT, OutputT> combineFn =
         (Combine.CombineFn<InputT, AccumT, OutputT>) combineTransform.getFn();
-
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
     Dataset<WindowedValue<InputT>> inputDataset = context.getDataset(input);
 
-    //TODO merge windows instead of doing unwindow/window to comply with beam model
-    Dataset<InputT> unWindowedDataset =
-        inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.genericEncoder());
-
     Dataset<Row> combinedRowDataset =
-        unWindowedDataset.agg(new AggregatorCombinerGlobally<>(combineFn).toColumn());
-
-    Dataset<OutputT> combinedDataset =
-        combinedRowDataset.map(
-            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
+        inputDataset.agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
 
-    // Window the result into global window.
     Dataset<WindowedValue<OutputT>> outputDataset =
-        combinedDataset.map(
-            WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder());
-
+        combinedRowDataset.map(
+            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
     context.putDataset(output, outputDataset);
   }
 }


[beam] 07/07: [to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f0522dc67be2ba2a6e2e1a4dcbfbc2343455ac0a
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 17 13:59:13 2019 +0200

    [to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java            | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index c55219b..1c35301 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.KeyValueGroupedDataset;
 import scala.Tuple2;
@@ -52,8 +53,10 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
-    KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
-        inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+    Dataset<KV<K, InputT>> unwindowedDataset = inputDataset
+        .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
+    KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
+        unwindowedDataset.groupByKey((MapFunction<KV<K, InputT>, K>) kv -> kv.getKey(), EncoderHelpers.genericEncoder());
 
     Dataset<Tuple2<K, OutputT>> combinedDataset =
         groupedDataset.agg(


[beam] 01/07: Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8a4372de576c7ac122b4383a941d406d3fb3b1ff
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed May 29 11:29:42 2019 +0200

    Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java          | 9 ++-------
 .../translation/batch/GroupByKeyTranslatorBatch.java             | 4 ++--
 .../spark/structuredstreaming/translation/helpers/KVHelpers.java | 5 +++--
 3 files changed, 7 insertions(+), 11 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 3d0ee8b..c55219b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -52,13 +52,8 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
-    //TODO merge windows instead of doing unwindow/window to comply with beam model
-    Dataset<KV<K, InputT>> keyedDataset =
-        inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
-
-    // TODO change extractKey impl to deal with WindowedVAlue and use it in GBK
-    KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
-        keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+    KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
+        inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
 
     Dataset<Tuple2<K, OutputT>> combinedDataset =
         groupedDataset.agg(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index b2b4441..148e643 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -56,8 +57,7 @@ class GroupByKeyTranslatorBatch<K, V>
 
     //group by key only
     KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input
-        .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(),
-            EncoderHelpers.genericEncoder());
+        .groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
 
     // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
     Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
index 3bea466..a53a93f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.spark.api.java.function.MapFunction;
 import scala.Tuple2;
@@ -25,8 +26,8 @@ import scala.Tuple2;
 public final class KVHelpers {
 
   /** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
-  public static <K, V> MapFunction<KV<K, V>, K> extractKey() {
-    return (MapFunction<KV<K, V>, K>) KV::getKey;
+  public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
+    return (MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey();
   }
 
   /** A Spark {@link MapFunction} for making a KV out of a {@link scala.Tuple2}. */


[beam] 02/07: Fix comment about schemas

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8d05d46f4b2b116dd3bed21566ede230720ccc0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jun 11 14:40:54 2019 +0200

    Fix comment about schemas
---
 .../batch/AggregatorCombinerGlobally.java          |  2 +-
 .../translation/helpers/ReduceFnRunnerHelpers.java | 77 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index a03c17e..2f8293b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -43,7 +43,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
   @Override
   public AccumT reduce(AccumT accumulator, InputT input) {
     // because of generic type InputT, spark cannot infer an input type.
-    // it would pass Integer as input if we had a Aggregator<Input, ..., ...>
+    // it would pass Integer as input if we had a Aggregator<Integer, ..., ...>
     // without the type inference it stores input in a GenericRowWithSchema
     Row row = (Row) input;
     InputT t = RowHelpers.extractObjectFromRow(row);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
new file mode 100644
index 0000000..97a225e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
@@ -0,0 +1,77 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Helpers to use {@link ReduceFnRunner}.
+ */
+public class ReduceFnRunnerHelpers<K, InputT, W extends BoundedWindow> {
+  public static <K, InputT, W extends BoundedWindow> void fireEligibleTimers(
+      InMemoryTimerInternals timerInternals,
+      ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
+      throws Exception {
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
+    }
+  }
+
+  /**
+   * {@link OutputWindowedValue} for ReduceFnRunner.
+   *
+   */
+  public static class GABWOutputWindowedValue<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
+    private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>();
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    @Override
+    public <AdditionalOutputT> void outputWindowedValue(
+        TupleTag<AdditionalOutputT> tag,
+        AdditionalOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
+    }
+
+    public Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+      return outputs;
+    }
+  }
+
+}


[beam] 04/07: Output data after combine

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4602f8367d9d6c38574010fdff8b044cedf4d286
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jun 14 14:50:45 2019 +0200

    Output data after combine
---
 .../translation/batch/AggregatorCombinerGlobally.java      | 14 ++++++++------
 .../translation/batch/CombineGloballyTranslatorBatch.java  |  7 +++++--
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 0d13218..6996165 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -44,7 +44,7 @@ import scala.Tuple2;
  * */
 
 class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
-    extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>, WindowedValue<OutputT>> {
+    extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>, Iterable<WindowedValue<OutputT>>> {
 
   private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
   private WindowingStrategy<InputT, W> windowingStrategy;
@@ -123,10 +123,12 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
     return null;
   }
 
-  @Override public WindowedValue<OutputT> finish(Iterable<WindowedValue<AccumT>> reduction) {
-    // TODO
-    //    return combineFn.extractOutput(reduction);
-    return null;
+  @Override public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {
+    List<WindowedValue<OutputT>> result = new ArrayList<>();
+    for (WindowedValue<AccumT> windowedValue: reduction) {
+      result.add(windowedValue.withValue(combineFn.extractOutput(windowedValue.getValue())));
+    }
+    return result;
   }
 
   @Override public Encoder<Iterable<WindowedValue<AccumT>>> bufferEncoder() {
@@ -134,7 +136,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
     return EncoderHelpers.genericEncoder();
   }
 
-  @Override public Encoder<WindowedValue<OutputT>> outputEncoder() {
+  @Override public Encoder<Iterable<WindowedValue<OutputT>>> outputEncoder() {
     // TODO replace with outputCoder if possible
     return EncoderHelpers.genericEncoder();
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index f18572b..fb9e1dd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -21,12 +21,12 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -57,9 +57,12 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
     Dataset<Row> combinedRowDataset =
         inputDataset.agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
 
-    Dataset<WindowedValue<OutputT>> outputDataset =
+    Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset =
         combinedRowDataset.map(
             RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
+    Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
+        (FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
+            windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());
     context.putDataset(output, outputDataset);
   }
 }


[beam] 05/07: Implement merge accumulators part of CombineGlobally translation with windowing

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8f8bae47ff8cb75c6337fe1be325e4ea64518dc8
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Sun Jun 16 12:12:09 2019 +0200

    Implement merge accumulators part of CombineGlobally translation with windowing
---
 .../batch/AggregatorCombinerGlobally.java          | 48 ++++++++++++++++++----
 1 file changed, 39 insertions(+), 9 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 6996165..d3ad62c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.expressions.Aggregator;
 import org.joda.time.Instant;
@@ -67,7 +69,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
     Collection<W> inputWindows = (Collection<W>)input.getWindows();
     Set<W> windows = collectAccumulatorsWindows(accumulators);
     windows.addAll(inputWindows);
-    Map<W, W> windowToMergeResult = null;
+    Map<W, W> windowToMergeResult;
     try {
       windowToMergeResult = mergeWindows(windowingStrategy, windows);
     } catch (Exception e) {
@@ -113,14 +115,42 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
   @Override public Iterable<WindowedValue<AccumT>> merge(
       Iterable<WindowedValue<AccumT>> accumulators1,
       Iterable<WindowedValue<AccumT>> accumulators2) {
-    // TODO
-    /*
-    ArrayList<AccumT> accumulators = new ArrayList<>();
-    accumulators.add(accumulator1);
-    accumulators.add(accumulator2);
-    return combineFn.mergeAccumulators(accumulators);
-*/
-    return null;
+
+    // merge the windows of all the accumulators
+    Iterable<WindowedValue<AccumT>> accumulators = Iterables.concat(accumulators1, accumulators2);
+    Set<W> accumulatorsWindows = collectAccumulatorsWindows(accumulators);
+    Map<W, W> windowToMergeResult;
+    try {
+     windowToMergeResult = mergeWindows(windowingStrategy, accumulatorsWindows);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to merge accumulators windows", e);
+    }
+
+    // group accumulators by their merged window
+    Map<W, List<WindowedValue<AccumT>>> mergedWindowToAccumulators = new HashMap<>();
+    for (WindowedValue<AccumT> accumulator : accumulators) {
+      //each accumulator has only one window
+      BoundedWindow accumulatorWindow = accumulator.getWindows().iterator().next();
+      W mergedWindowForAccumulator = windowToMergeResult.get(accumulatorWindow);
+      if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null){
+        mergedWindowToAccumulators.put(mergedWindowForAccumulator, Collections.singletonList(accumulator));
+      }
+      else {
+        mergedWindowToAccumulators.get(mergedWindowForAccumulator).add(accumulator);
+      }
+    }
+    // merge the accumulators for each mergedWindow
+    List<WindowedValue<AccumT>> result = new ArrayList<>();
+    for (Map.Entry<W, List<WindowedValue<AccumT>>> entry : mergedWindowToAccumulators.entrySet()){
+      W mergedWindow = entry.getKey();
+      List<WindowedValue<AccumT>> accumulatorsForMergedWindow = entry.getValue();
+      result.add(WindowedValue
+          .of(combineFn.mergeAccumulators(accumulatorsForMergedWindow.stream().map(x -> x.getValue()).collect(
+              Collectors.toList())), timestampCombiner.combine(accumulatorsForMergedWindow.stream().map(x -> x.getTimestamp()).collect(
+              Collectors.toList())),
+              mergedWindow, PaneInfo.NO_FIRING));
+    }
+    return result;
   }
 
   @Override public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {


[beam] 06/07: Fix encoder in combine call

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9a269eff5fe67e4542643c64e392fec003fb28f7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 17 11:53:37 2019 +0200

    Fix encoder in combine call
---
 .../translation/batch/CombineGloballyTranslatorBatch.java               | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index fb9e1dd..f29b2c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -59,7 +59,7 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
 
     Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset =
         combinedRowDataset.map(
-            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
+            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
     Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
         (FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
             windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());