You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/18 08:27:54 UTC
[beam] branch spark-runner_structured-streaming updated (ee2c0e6 ->
c980d4c)
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.
from ee2c0e6 Apply spotless
new fb3aa34 Add missing windowedValue Encoder call in Pardo
new c980d4c Fix typo
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../translation/batch/ParDoTranslatorBatch.java | 37 ++++++++++++----------
.../batch/WindowAssignTranslatorBatch.java | 4 +--
2 files changed, 23 insertions(+), 18 deletions(-)
[beam] 02/02: Fix typo
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit c980d4cbc88047549086a2a6214ca7a7dd5e285d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Oct 11 16:18:48 2019 +0200
Fix typo
---
.../translation/batch/WindowAssignTranslatorBatch.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 59cc32a..4ac8a3f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -46,12 +46,12 @@ class WindowAssignTranslatorBatch<T>
context.putDataset(output, inputDataset);
} else {
WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
- WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder =
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder());
Dataset<WindowedValue<T>> outputDataset =
inputDataset.map(
WindowingHelpers.assignWindowsMapFunction(windowFn),
- EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
+ EncoderHelpers.fromBeamCoder(windowedValueCoder));
context.putDataset(output, outputDataset);
}
}
[beam] 01/02: Add missing windowedValue Encoder call in Pardo
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit fb3aa34b55bdb21afc030ce618a27dcf2623f40e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Oct 11 16:18:11 2019 +0200
Add missing windowedValue Encoder call in Pardo
---
.../translation/batch/ParDoTranslatorBatch.java | 37 ++++++++++++----------
1 file changed, 21 insertions(+), 16 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index e73d38e..f5a109e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -93,6 +93,8 @@ class ParDoTranslatorBatch<InputT, OutputT>
List<TupleTag<?>> outputTags = new ArrayList<>(outputs.keySet());
WindowingStrategy<?, ?> windowingStrategy =
((PCollection<InputT>) input).getWindowingStrategy();
+ Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
+ Coder<? extends BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
// construct a map from side input to WindowingStrategy so that
// the DoFn runner can map main-input windows to side input windows
@@ -105,7 +107,6 @@ class ParDoTranslatorBatch<InputT, OutputT>
SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context);
Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
- Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance();
List<TupleTag<?>> additionalOutputTags = new ArrayList<>();
@@ -131,24 +132,25 @@ class ParDoTranslatorBatch<InputT, OutputT>
broadcastStateData,
doFnSchemaInformation);
- MultiOuputCoder multipleOutputCoder =
- MultiOuputCoder.of(
- SerializableCoder.of(TupleTag.class),
- outputCoderMap,
- windowingStrategy.getWindowFn().windowCoder());
+ MultiOuputCoder multipleOutputCoder = MultiOuputCoder
+ .of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder);
Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder));
if (outputs.entrySet().size() > 1) {
allOutputs.persist();
for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
- pruneOutputFilteredByTag(context, allOutputs, output);
+ pruneOutputFilteredByTag(context, allOutputs, output, windowCoder);
}
} else {
+ Coder<OutputT> outputCoder = ((PCollection<OutputT>) outputs.get(mainOutputTag)).getCoder();
+ Coder<WindowedValue<?>> windowedValueCoder =
+ (Coder<WindowedValue<?>>)
+ (Coder<?>) WindowedValue.getFullCoder(outputCoder, windowCoder);
Dataset<WindowedValue<?>> outputDataset =
allOutputs.map(
(MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
value -> value._2,
- EncoderHelpers.windowedValueEncoder());
+ EncoderHelpers.fromBeamCoder(windowedValueCoder));
context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset);
}
}
@@ -159,14 +161,14 @@ class ParDoTranslatorBatch<InputT, OutputT>
JavaSparkContext.fromSparkContext(context.getSparkSession().sparkContext());
SideInputBroadcast sideInputBroadcast = new SideInputBroadcast();
- for (PCollectionView<?> input : sideInputs) {
+ for (PCollectionView<?> sideInput : sideInputs) {
Coder<? extends BoundedWindow> windowCoder =
- input.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+ sideInput.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
Coder<WindowedValue<?>> windowedValueCoder =
(Coder<WindowedValue<?>>)
- (Coder<?>) WindowedValue.getFullCoder(input.getPCollection().getCoder(), windowCoder);
-
- Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(input);
+ (Coder<?>) WindowedValue.getFullCoder(sideInput.getPCollection().getCoder(), windowCoder);
+ Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(sideInput);
List<WindowedValue<?>> valuesList = broadcastSet.collectAsList();
List<byte[]> codedValues = new ArrayList<>();
for (WindowedValue<?> v : valuesList) {
@@ -174,7 +176,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
}
sideInputBroadcast.add(
- input.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder);
+ sideInput.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder);
}
return sideInputBroadcast;
}
@@ -213,14 +215,17 @@ class ParDoTranslatorBatch<InputT, OutputT>
private void pruneOutputFilteredByTag(
TranslationContext context,
Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs,
- Map.Entry<TupleTag<?>, PValue> output) {
+ Map.Entry<TupleTag<?>, PValue> output, Coder<? extends BoundedWindow> windowCoder) {
Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset =
allOutputs.filter(new DoFnFilterFunction(output.getKey()));
+ Coder<WindowedValue<?>> windowedValueCoder =
+ (Coder<WindowedValue<?>>)
+ (Coder<?>) WindowedValue.getFullCoder(((PCollection<OutputT>)output.getValue()).getCoder(), windowCoder);
Dataset<WindowedValue<?>> outputDataset =
filteredDataset.map(
(MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
value -> value._2,
- EncoderHelpers.windowedValueEncoder());
+ EncoderHelpers.fromBeamCoder(windowedValueCoder));
context.putDatasetWildcard(output.getValue(), outputDataset);
}