You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:43 UTC
[beam] 05/07: Implement merge accumulators part of CombineGlobally
translation with windowing
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8f8bae47ff8cb75c6337fe1be325e4ea64518dc8
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Sun Jun 16 12:12:09 2019 +0200
Implement merge accumulators part of CombineGlobally translation with windowing
---
.../batch/AggregatorCombinerGlobally.java | 48 ++++++++++++++++++----
1 file changed, 39 insertions(+), 9 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 6996165..d3ad62c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.joda.time.Instant;
@@ -67,7 +69,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
Collection<W> inputWindows = (Collection<W>)input.getWindows();
Set<W> windows = collectAccumulatorsWindows(accumulators);
windows.addAll(inputWindows);
- Map<W, W> windowToMergeResult = null;
+ Map<W, W> windowToMergeResult;
try {
windowToMergeResult = mergeWindows(windowingStrategy, windows);
} catch (Exception e) {
@@ -113,14 +115,42 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
@Override public Iterable<WindowedValue<AccumT>> merge(
Iterable<WindowedValue<AccumT>> accumulators1,
Iterable<WindowedValue<AccumT>> accumulators2) {
- // TODO
- /*
- ArrayList<AccumT> accumulators = new ArrayList<>();
- accumulators.add(accumulator1);
- accumulators.add(accumulator2);
- return combineFn.mergeAccumulators(accumulators);
-*/
- return null;
+
+ // merge the windows of all the accumulators
+ Iterable<WindowedValue<AccumT>> accumulators = Iterables.concat(accumulators1, accumulators2);
+ Set<W> accumulatorsWindows = collectAccumulatorsWindows(accumulators);
+ Map<W, W> windowToMergeResult;
+ try {
+ windowToMergeResult = mergeWindows(windowingStrategy, accumulatorsWindows);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to merge accumulators windows", e);
+ }
+
+ // group accumulators by their merged window
+ Map<W, List<WindowedValue<AccumT>>> mergedWindowToAccumulators = new HashMap<>();
+ for (WindowedValue<AccumT> accumulator : accumulators) {
+ //each accumulator has only one window
+ BoundedWindow accumulatorWindow = accumulator.getWindows().iterator().next();
+ W mergedWindowForAccumulator = windowToMergeResult.get(accumulatorWindow);
+ if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null){
+ mergedWindowToAccumulators.put(mergedWindowForAccumulator, Collections.singletonList(accumulator));
+ }
+ else {
+ mergedWindowToAccumulators.get(mergedWindowForAccumulator).add(accumulator);
+ }
+ }
+ // merge the accumulators for each mergedWindow
+ List<WindowedValue<AccumT>> result = new ArrayList<>();
+ for (Map.Entry<W, List<WindowedValue<AccumT>>> entry : mergedWindowToAccumulators.entrySet()){
+ W mergedWindow = entry.getKey();
+ List<WindowedValue<AccumT>> accumulatorsForMergedWindow = entry.getValue();
+ result.add(WindowedValue
+ .of(combineFn.mergeAccumulators(accumulatorsForMergedWindow.stream().map(x -> x.getValue()).collect(
+ Collectors.toList())), timestampCombiner.combine(accumulatorsForMergedWindow.stream().map(x -> x.getTimestamp()).collect(
+ Collectors.toList())),
+ mergedWindow, PaneInfo.NO_FIRING));
+ }
+ return result;
}
@Override public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {