You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:38 UTC
[beam] branch spark-runner_structured-streaming updated (8cdd143 ->
f0522dc)
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.
from 8cdd143 Add TODO in Combine translations
new 8a4372d Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK
new 8d05d46 Fix comment about schemas
new bba08b4 Implement reduce part of CombineGlobally translation with windowing
new 4602f83 Output data after combine
new 8f8bae4 Implement merge accumulators part of CombineGlobally translation with windowing
new 9a269ef Fix encoder in combine call
new f0522dc [to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)
The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../batch/AggregatorCombinerGlobally.java | 205 ++++++++++++++++++---
.../batch/CombineGloballyTranslatorBatch.java | 22 +--
.../batch/CombinePerKeyTranslatorBatch.java | 10 +-
.../batch/GroupByKeyTranslatorBatch.java | 4 +-
.../translation/helpers/KVHelpers.java | 5 +-
.../translation/helpers/ReduceFnRunnerHelpers.java | 77 ++++++++
6 files changed, 269 insertions(+), 54 deletions(-)
create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
[beam] 03/07: Implement reduce part of CombineGlobally translation
with windowing
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit bba08b4201a1dc53ae117d9bd495b117923e189d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jun 13 11:23:52 2019 +0200
Implement reduce part of CombineGlobally translation with windowing
---
.../batch/AggregatorCombinerGlobally.java | 165 +++++++++++++++++----
.../batch/CombineGloballyTranslatorBatch.java | 19 +--
2 files changed, 144 insertions(+), 40 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 2f8293b..0d13218 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -18,60 +18,173 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.Aggregator;
+import org.joda.time.Instant;
+import scala.Tuple2;
-/** An {@link Aggregator} for the Spark Batch Runner. */
-class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
- extends Aggregator<InputT, AccumT, OutputT> {
+/** An {@link Aggregator} for the Spark Batch Runner. It does not use ReduceFnRunner
+ * for windowMerging, because reduceFnRunner is based on state which requires a keyed collection.
+ * The accumulator is a {@code Iterable<WindowedValue<AccumT>> because an {@code InputT} can be in multiple windows. So, when accumulating {@code InputT} values, we create one accumulator per input window.
+ * */
+
+class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
+ extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>, WindowedValue<OutputT>> {
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+ private WindowingStrategy<InputT, W> windowingStrategy;
+ private TimestampCombiner timestampCombiner;
- public AggregatorCombinerGlobally(Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ public AggregatorCombinerGlobally(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, WindowingStrategy<?, ?> windowingStrategy) {
this.combineFn = combineFn;
+ this.windowingStrategy = (WindowingStrategy<InputT, W>) windowingStrategy;
+ this.timestampCombiner = windowingStrategy.getTimestampCombiner();
}
- @Override
- public AccumT zero() {
- return combineFn.createAccumulator();
+ @Override public Iterable<WindowedValue<AccumT>> zero() {
+ return new ArrayList<>();
}
- @Override
- public AccumT reduce(AccumT accumulator, InputT input) {
- // because of generic type InputT, spark cannot infer an input type.
- // it would pass Integer as input if we had a Aggregator<Integer, ..., ...>
- // without the type inference it stores input in a GenericRowWithSchema
- Row row = (Row) input;
- InputT t = RowHelpers.extractObjectFromRow(row);
- return combineFn.addInput(accumulator, t);
+ @Override public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>> accumulators,
+ WindowedValue<InputT> input) {
+
+ //concatenate accumulators windows and input windows and merge the windows
+ Collection<W> inputWindows = (Collection<W>)input.getWindows();
+ Set<W> windows = collectAccumulatorsWindows(accumulators);
+ windows.addAll(inputWindows);
+ Map<W, W> windowToMergeResult = null;
+ try {
+ windowToMergeResult = mergeWindows(windowingStrategy, windows);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to merge accumulators windows and input windows", e);
+ }
+
+ // iterate through the input windows and for each, create an accumulator with the merged window
+ // associated to it and call addInput with the accumulator.
+ // Maintain a map of the accumulators for use as output
+ Map<W, Tuple2<AccumT, Instant>> mapState = new HashMap<>();
+ for (W inputWindow:inputWindows) {
+ W mergedWindow = windowToMergeResult.get(inputWindow);
+ mergedWindow = mergedWindow == null ? inputWindow : mergedWindow;
+ Tuple2<AccumT, Instant> accumAndInstant = mapState.get(mergedWindow);
+ // if there is no accumulator associated with this window yet, create one
+ if (accumAndInstant == null) {
+ AccumT accum = combineFn.addInput(combineFn.createAccumulator(), input.getValue());
+ Instant windowTimestamp =
+ timestampCombiner.assign(
+ mergedWindow, windowingStrategy.getWindowFn().getOutputTime(input.getTimestamp(), mergedWindow));
+ accumAndInstant = new Tuple2<>(accum, windowTimestamp);
+ mapState.put(mergedWindow, accumAndInstant);
+ } else {
+ AccumT updatedAccum =
+ combineFn.addInput(accumAndInstant._1, input.getValue());
+ Instant updatedTimestamp = timestampCombiner.combine(accumAndInstant._2, timestampCombiner
+ .assign(mergedWindow,
+ windowingStrategy.getWindowFn().getOutputTime(input.getTimestamp(), mergedWindow)));
+ accumAndInstant = new Tuple2<>(updatedAccum, updatedTimestamp);
+ }
+ }
+ // output the accumulators map
+ List<WindowedValue<AccumT>> result = new ArrayList<>();
+ for (Map.Entry<W, Tuple2<AccumT, Instant>> entry : mapState.entrySet()) {
+ AccumT accumulator = entry.getValue()._1;
+ Instant windowTimestamp = entry.getValue()._2;
+ W window = entry.getKey();
+ result.add(WindowedValue.of(accumulator, windowTimestamp, window, PaneInfo.NO_FIRING));
+ }
+ return result;
}
- @Override
- public AccumT merge(AccumT accumulator1, AccumT accumulator2) {
+ @Override public Iterable<WindowedValue<AccumT>> merge(
+ Iterable<WindowedValue<AccumT>> accumulators1,
+ Iterable<WindowedValue<AccumT>> accumulators2) {
+ // TODO
+ /*
ArrayList<AccumT> accumulators = new ArrayList<>();
accumulators.add(accumulator1);
accumulators.add(accumulator2);
return combineFn.mergeAccumulators(accumulators);
+*/
+ return null;
}
- @Override
- public OutputT finish(AccumT reduction) {
- return combineFn.extractOutput(reduction);
+ @Override public WindowedValue<OutputT> finish(Iterable<WindowedValue<AccumT>> reduction) {
+ // TODO
+ // return combineFn.extractOutput(reduction);
+ return null;
}
- @Override
- public Encoder<AccumT> bufferEncoder() {
+ @Override public Encoder<Iterable<WindowedValue<AccumT>>> bufferEncoder() {
// TODO replace with accumulatorCoder if possible
return EncoderHelpers.genericEncoder();
}
- @Override
- public Encoder<OutputT> outputEncoder() {
+ @Override public Encoder<WindowedValue<OutputT>> outputEncoder() {
// TODO replace with outputCoder if possible
return EncoderHelpers.genericEncoder();
}
+
+ private Set<W> collectAccumulatorsWindows(Iterable<WindowedValue<AccumT>> accumulators) {
+ Set<W> windows = new HashSet<>();
+ for (WindowedValue<?> accumulator : accumulators) {
+ // an accumulator has only one window associated to it.
+ W accumulatorWindow = (W) accumulator.getWindows().iterator().next();
+ windows.add(accumulatorWindow);
+ } return windows;
+ }
+
+ private Map<W, W> mergeWindows(WindowingStrategy<InputT, W> windowingStrategy, Set<W> windows)
+ throws Exception {
+ WindowFn<InputT, W> windowFn = windowingStrategy.getWindowFn();
+
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ // Return an empty map, indicating that every window is not merged.
+ return Collections.emptyMap();
+ }
+
+ Map<W, W> windowToMergeResult = new HashMap<>();
+ windowFn.mergeWindows(new MergeContextImpl(windowFn, windows, windowToMergeResult));
+ return windowToMergeResult;
+ }
+
+
+ private class MergeContextImpl extends WindowFn<InputT, W>.MergeContext {
+
+ private Set<W> windows;
+ private Map<W, W> windowToMergeResult;
+
+ MergeContextImpl(WindowFn<InputT, W> windowFn, Set<W> windows, Map<W, W> windowToMergeResult) {
+ windowFn.super();
+ this.windows = windows;
+ this.windowToMergeResult = windowToMergeResult;
+ }
+
+ @Override
+ public Collection<W> windows() {
+ return windows;
+ }
+
+ @Override
+ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
+ for (W w : toBeMerged) {
+ windowToMergeResult.put(w, mergeResult);
+ }
+ }
+ }
+
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index 53651cf..f18572b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -50,25 +51,15 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
@SuppressWarnings("unchecked")
final Combine.CombineFn<InputT, AccumT, OutputT> combineFn =
(Combine.CombineFn<InputT, AccumT, OutputT>) combineTransform.getFn();
-
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
Dataset<WindowedValue<InputT>> inputDataset = context.getDataset(input);
- //TODO merge windows instead of doing unwindow/window to comply with beam model
- Dataset<InputT> unWindowedDataset =
- inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.genericEncoder());
-
Dataset<Row> combinedRowDataset =
- unWindowedDataset.agg(new AggregatorCombinerGlobally<>(combineFn).toColumn());
-
- Dataset<OutputT> combinedDataset =
- combinedRowDataset.map(
- RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
+ inputDataset.agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
- // Window the result into global window.
Dataset<WindowedValue<OutputT>> outputDataset =
- combinedDataset.map(
- WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder());
-
+ combinedRowDataset.map(
+ RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
context.putDataset(output, outputDataset);
}
}
[beam] 07/07: [to remove] temporary: revert extractKey while
combinePerKey is not done (so that it compiles)
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit f0522dc67be2ba2a6e2e1a4dcbfbc2343455ac0a
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 17 13:59:13 2019 +0200
[to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)
---
.../translation/batch/CombinePerKeyTranslatorBatch.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index c55219b..1c35301 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KeyValueGroupedDataset;
import scala.Tuple2;
@@ -52,8 +53,10 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
- KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
- inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+ Dataset<KV<K, InputT>> unwindowedDataset = inputDataset
+ .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
+ KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
+ unwindowedDataset.groupByKey((MapFunction<KV<K, InputT>, K>) kv -> kv.getKey(), EncoderHelpers.genericEncoder());
Dataset<Tuple2<K, OutputT>> combinedDataset =
groupedDataset.agg(
[beam] 01/07: Update KVHelpers.extractKey() to deal with
WindowedValue and update GBK and CPK
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8a4372de576c7ac122b4383a941d406d3fb3b1ff
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed May 29 11:29:42 2019 +0200
Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK
---
.../translation/batch/CombinePerKeyTranslatorBatch.java | 9 ++-------
.../translation/batch/GroupByKeyTranslatorBatch.java | 4 ++--
.../spark/structuredstreaming/translation/helpers/KVHelpers.java | 5 +++--
3 files changed, 7 insertions(+), 11 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 3d0ee8b..c55219b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -52,13 +52,8 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
- //TODO merge windows instead of doing unwindow/window to comply with beam model
- Dataset<KV<K, InputT>> keyedDataset =
- inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
-
- // TODO change extractKey impl to deal with WindowedVAlue and use it in GBK
- KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
- keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+ KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
+ inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
Dataset<Tuple2<K, OutputT>> combinedDataset =
groupedDataset.agg(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index b2b4441..148e643 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
@@ -56,8 +57,7 @@ class GroupByKeyTranslatorBatch<K, V>
//group by key only
KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input
- .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(),
- EncoderHelpers.genericEncoder());
+ .groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
// Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
index 3bea466..a53a93f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.function.MapFunction;
import scala.Tuple2;
@@ -25,8 +26,8 @@ import scala.Tuple2;
public final class KVHelpers {
/** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
- public static <K, V> MapFunction<KV<K, V>, K> extractKey() {
- return (MapFunction<KV<K, V>, K>) KV::getKey;
+ public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
+ return (MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey();
}
/** A Spark {@link MapFunction} for making a KV out of a {@link scala.Tuple2}. */
[beam] 02/07: Fix comment about schemas
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8d05d46f4b2b116dd3bed21566ede230720ccc0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jun 11 14:40:54 2019 +0200
Fix comment about schemas
---
.../batch/AggregatorCombinerGlobally.java | 2 +-
.../translation/helpers/ReduceFnRunnerHelpers.java | 77 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index a03c17e..2f8293b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -43,7 +43,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
@Override
public AccumT reduce(AccumT accumulator, InputT input) {
// because of generic type InputT, spark cannot infer an input type.
- // it would pass Integer as input if we had a Aggregator<Input, ..., ...>
+ // it would pass Integer as input if we had a Aggregator<Integer, ..., ...>
// without the type inference it stores input in a GenericRowWithSchema
Row row = (Row) input;
InputT t = RowHelpers.extractObjectFromRow(row);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
new file mode 100644
index 0000000..97a225e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java
@@ -0,0 +1,77 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Helpers to use {@link ReduceFnRunner}.
+ */
+public class ReduceFnRunnerHelpers<K, InputT, W extends BoundedWindow> {
+ public static <K, InputT, W extends BoundedWindow> void fireEligibleTimers(
+ InMemoryTimerInternals timerInternals,
+ ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
+ throws Exception {
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ while (true) {
+ TimerInternals.TimerData timer;
+ while ((timer = timerInternals.removeNextEventTimer()) != null) {
+ timers.add(timer);
+ }
+ while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ if (timers.isEmpty()) {
+ break;
+ }
+ reduceFnRunner.onTimers(timers);
+ timers.clear();
+ }
+ }
+
+ /**
+ * {@link OutputWindowedValue} for ReduceFnRunner.
+ *
+ */
+ public static class GABWOutputWindowedValue<K, V>
+ implements OutputWindowedValue<KV<K, Iterable<V>>> {
+ private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>();
+
+ @Override
+ public void outputWindowedValue(
+ KV<K, Iterable<V>> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
+ }
+
+ public Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+ return outputs;
+ }
+ }
+
+}
[beam] 04/07: Output data after combine
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 4602f8367d9d6c38574010fdff8b044cedf4d286
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jun 14 14:50:45 2019 +0200
Output data after combine
---
.../translation/batch/AggregatorCombinerGlobally.java | 14 ++++++++------
.../translation/batch/CombineGloballyTranslatorBatch.java | 7 +++++--
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 0d13218..6996165 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -44,7 +44,7 @@ import scala.Tuple2;
* */
class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
- extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>, WindowedValue<OutputT>> {
+ extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>, Iterable<WindowedValue<OutputT>>> {
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private WindowingStrategy<InputT, W> windowingStrategy;
@@ -123,10 +123,12 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
return null;
}
- @Override public WindowedValue<OutputT> finish(Iterable<WindowedValue<AccumT>> reduction) {
- // TODO
- // return combineFn.extractOutput(reduction);
- return null;
+ @Override public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {
+ List<WindowedValue<OutputT>> result = new ArrayList<>();
+ for (WindowedValue<AccumT> windowedValue: reduction) {
+ result.add(windowedValue.withValue(combineFn.extractOutput(windowedValue.getValue())));
+ }
+ return result;
}
@Override public Encoder<Iterable<WindowedValue<AccumT>>> bufferEncoder() {
@@ -134,7 +136,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
return EncoderHelpers.genericEncoder();
}
- @Override public Encoder<WindowedValue<OutputT>> outputEncoder() {
+ @Override public Encoder<Iterable<WindowedValue<OutputT>>> outputEncoder() {
// TODO replace with outputCoder if possible
return EncoderHelpers.genericEncoder();
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index f18572b..fb9e1dd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -21,12 +21,12 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -57,9 +57,12 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
Dataset<Row> combinedRowDataset =
inputDataset.agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
- Dataset<WindowedValue<OutputT>> outputDataset =
+ Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset =
combinedRowDataset.map(
RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
+ Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
+ (FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
+ windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());
context.putDataset(output, outputDataset);
}
}
[beam] 05/07: Implement merge accumulators part of CombineGlobally
translation with windowing
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8f8bae47ff8cb75c6337fe1be325e4ea64518dc8
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Sun Jun 16 12:12:09 2019 +0200
Implement merge accumulators part of CombineGlobally translation with windowing
---
.../batch/AggregatorCombinerGlobally.java | 48 ++++++++++++++++++----
1 file changed, 39 insertions(+), 9 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 6996165..d3ad62c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.joda.time.Instant;
@@ -67,7 +69,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
Collection<W> inputWindows = (Collection<W>)input.getWindows();
Set<W> windows = collectAccumulatorsWindows(accumulators);
windows.addAll(inputWindows);
- Map<W, W> windowToMergeResult = null;
+ Map<W, W> windowToMergeResult;
try {
windowToMergeResult = mergeWindows(windowingStrategy, windows);
} catch (Exception e) {
@@ -113,14 +115,42 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
@Override public Iterable<WindowedValue<AccumT>> merge(
Iterable<WindowedValue<AccumT>> accumulators1,
Iterable<WindowedValue<AccumT>> accumulators2) {
- // TODO
- /*
- ArrayList<AccumT> accumulators = new ArrayList<>();
- accumulators.add(accumulator1);
- accumulators.add(accumulator2);
- return combineFn.mergeAccumulators(accumulators);
-*/
- return null;
+
+ // merge the windows of all the accumulators
+ Iterable<WindowedValue<AccumT>> accumulators = Iterables.concat(accumulators1, accumulators2);
+ Set<W> accumulatorsWindows = collectAccumulatorsWindows(accumulators);
+ Map<W, W> windowToMergeResult;
+ try {
+ windowToMergeResult = mergeWindows(windowingStrategy, accumulatorsWindows);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to merge accumulators windows", e);
+ }
+
+ // group accumulators by their merged window
+ Map<W, List<WindowedValue<AccumT>>> mergedWindowToAccumulators = new HashMap<>();
+ for (WindowedValue<AccumT> accumulator : accumulators) {
+ //each accumulator has only one window
+ BoundedWindow accumulatorWindow = accumulator.getWindows().iterator().next();
+ W mergedWindowForAccumulator = windowToMergeResult.get(accumulatorWindow);
+ if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null){
+ mergedWindowToAccumulators.put(mergedWindowForAccumulator, Collections.singletonList(accumulator));
+ }
+ else {
+ mergedWindowToAccumulators.get(mergedWindowForAccumulator).add(accumulator);
+ }
+ }
+ // merge the accumulators for each mergedWindow
+ List<WindowedValue<AccumT>> result = new ArrayList<>();
+ for (Map.Entry<W, List<WindowedValue<AccumT>>> entry : mergedWindowToAccumulators.entrySet()){
+ W mergedWindow = entry.getKey();
+ List<WindowedValue<AccumT>> accumulatorsForMergedWindow = entry.getValue();
+ result.add(WindowedValue
+ .of(combineFn.mergeAccumulators(accumulatorsForMergedWindow.stream().map(x -> x.getValue()).collect(
+ Collectors.toList())), timestampCombiner.combine(accumulatorsForMergedWindow.stream().map(x -> x.getTimestamp()).collect(
+ Collectors.toList())),
+ mergedWindow, PaneInfo.NO_FIRING));
+ }
+ return result;
}
@Override public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {
[beam] 06/07: Fix encoder in combine call
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9a269eff5fe67e4542643c64e392fec003fb28f7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 17 11:53:37 2019 +0200
Fix encoder in combine call
---
.../translation/batch/CombineGloballyTranslatorBatch.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index fb9e1dd..f29b2c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -59,7 +59,7 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset =
combinedRowDataset.map(
- RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
+ RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
(FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());