You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:43 UTC

[beam] 05/07: Implement merge accumulators part of CombineGlobally translation with windowing

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8f8bae47ff8cb75c6337fe1be325e4ea64518dc8
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Sun Jun 16 12:12:09 2019 +0200

    Implement merge accumulators part of CombineGlobally translation with windowing
---
 .../batch/AggregatorCombinerGlobally.java          | 48 ++++++++++++++++++----
 1 file changed, 39 insertions(+), 9 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 6996165..d3ad62c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.expressions.Aggregator;
 import org.joda.time.Instant;
@@ -67,7 +69,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
     Collection<W> inputWindows = (Collection<W>)input.getWindows();
     Set<W> windows = collectAccumulatorsWindows(accumulators);
     windows.addAll(inputWindows);
-    Map<W, W> windowToMergeResult = null;
+    Map<W, W> windowToMergeResult;
     try {
       windowToMergeResult = mergeWindows(windowingStrategy, windows);
     } catch (Exception e) {
@@ -113,14 +115,42 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
   @Override public Iterable<WindowedValue<AccumT>> merge(
       Iterable<WindowedValue<AccumT>> accumulators1,
       Iterable<WindowedValue<AccumT>> accumulators2) {
-    // TODO
-    /*
-    ArrayList<AccumT> accumulators = new ArrayList<>();
-    accumulators.add(accumulator1);
-    accumulators.add(accumulator2);
-    return combineFn.mergeAccumulators(accumulators);
-*/
-    return null;
+
+    // merge the windows of all the accumulators
+    Iterable<WindowedValue<AccumT>> accumulators = Iterables.concat(accumulators1, accumulators2);
+    Set<W> accumulatorsWindows = collectAccumulatorsWindows(accumulators);
+    Map<W, W> windowToMergeResult;
+    try {
+     windowToMergeResult = mergeWindows(windowingStrategy, accumulatorsWindows);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to merge accumulators windows", e);
+    }
+
+    // group accumulators by their merged window
+    Map<W, List<WindowedValue<AccumT>>> mergedWindowToAccumulators = new HashMap<>();
+    for (WindowedValue<AccumT> accumulator : accumulators) {
+      //each accumulator has only one window
+      BoundedWindow accumulatorWindow = accumulator.getWindows().iterator().next();
+      W mergedWindowForAccumulator = windowToMergeResult.get(accumulatorWindow);
+      if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null){
+        mergedWindowToAccumulators.put(mergedWindowForAccumulator, Collections.singletonList(accumulator));
+      }
+      else {
+        mergedWindowToAccumulators.get(mergedWindowForAccumulator).add(accumulator);
+      }
+    }
+    // merge the accumulators for each mergedWindow
+    List<WindowedValue<AccumT>> result = new ArrayList<>();
+    for (Map.Entry<W, List<WindowedValue<AccumT>>> entry : mergedWindowToAccumulators.entrySet()){
+      W mergedWindow = entry.getKey();
+      List<WindowedValue<AccumT>> accumulatorsForMergedWindow = entry.getValue();
+      result.add(WindowedValue
+          .of(combineFn.mergeAccumulators(accumulatorsForMergedWindow.stream().map(x -> x.getValue()).collect(
+              Collectors.toList())), timestampCombiner.combine(accumulatorsForMergedWindow.stream().map(x -> x.getTimestamp()).collect(
+              Collectors.toList())),
+              mergedWindow, PaneInfo.NO_FIRING));
+    }
+    return result;
   }
 
   @Override public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {