You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/18 08:27:54 UTC

[beam] branch spark-runner_structured-streaming updated (ee2c0e6 -> c980d4c)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from ee2c0e6  Apply spotless
     new fb3aa34  Add missing windowedValue Encoder call in Pardo
     new c980d4c  Fix typo

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/batch/ParDoTranslatorBatch.java    | 37 ++++++++++++----------
 .../batch/WindowAssignTranslatorBatch.java         |  4 +--
 2 files changed, 23 insertions(+), 18 deletions(-)


[beam] 02/02: Fix typo

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c980d4cbc88047549086a2a6214ca7a7dd5e285d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Oct 11 16:18:48 2019 +0200

    Fix typo
---
 .../translation/batch/WindowAssignTranslatorBatch.java                | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 59cc32a..4ac8a3f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -46,12 +46,12 @@ class WindowAssignTranslatorBatch<T>
       context.putDataset(output, inputDataset);
     } else {
       WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
-      WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder =
+      WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
           WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder());
       Dataset<WindowedValue<T>> outputDataset =
           inputDataset.map(
               WindowingHelpers.assignWindowsMapFunction(windowFn),
-              EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
+              EncoderHelpers.fromBeamCoder(windowedValueCoder));
       context.putDataset(output, outputDataset);
     }
   }


[beam] 01/02: Add missing windowedValue Encoder call in Pardo

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fb3aa34b55bdb21afc030ce618a27dcf2623f40e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Oct 11 16:18:11 2019 +0200

    Add missing windowedValue Encoder call in Pardo
---
 .../translation/batch/ParDoTranslatorBatch.java    | 37 ++++++++++++----------
 1 file changed, 21 insertions(+), 16 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index e73d38e..f5a109e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -93,6 +93,8 @@ class ParDoTranslatorBatch<InputT, OutputT>
     List<TupleTag<?>> outputTags = new ArrayList<>(outputs.keySet());
     WindowingStrategy<?, ?> windowingStrategy =
         ((PCollection<InputT>) input).getWindowingStrategy();
+    Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
+    Coder<? extends BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
 
     // construct a map from side input to WindowingStrategy so that
     // the DoFn runner can map main-input windows to side input windows
@@ -105,7 +107,6 @@ class ParDoTranslatorBatch<InputT, OutputT>
     SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context);
 
     Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
-    Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
     MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance();
 
     List<TupleTag<?>> additionalOutputTags = new ArrayList<>();
@@ -131,24 +132,25 @@ class ParDoTranslatorBatch<InputT, OutputT>
             broadcastStateData,
             doFnSchemaInformation);
 
-    MultiOuputCoder multipleOutputCoder =
-        MultiOuputCoder.of(
-            SerializableCoder.of(TupleTag.class),
-            outputCoderMap,
-            windowingStrategy.getWindowFn().windowCoder());
+    MultiOuputCoder multipleOutputCoder = MultiOuputCoder
+        .of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder);
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
         inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder));
     if (outputs.entrySet().size() > 1) {
       allOutputs.persist();
       for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-        pruneOutputFilteredByTag(context, allOutputs, output);
+        pruneOutputFilteredByTag(context, allOutputs, output, windowCoder);
       }
     } else {
+      Coder<OutputT> outputCoder = ((PCollection<OutputT>) outputs.get(mainOutputTag)).getCoder();
+      Coder<WindowedValue<?>> windowedValueCoder =
+          (Coder<WindowedValue<?>>)
+              (Coder<?>) WindowedValue.getFullCoder(outputCoder, windowCoder);
       Dataset<WindowedValue<?>> outputDataset =
           allOutputs.map(
               (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
                   value -> value._2,
-              EncoderHelpers.windowedValueEncoder());
+              EncoderHelpers.fromBeamCoder(windowedValueCoder));
       context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset);
     }
   }
@@ -159,14 +161,14 @@ class ParDoTranslatorBatch<InputT, OutputT>
         JavaSparkContext.fromSparkContext(context.getSparkSession().sparkContext());
 
     SideInputBroadcast sideInputBroadcast = new SideInputBroadcast();
-    for (PCollectionView<?> input : sideInputs) {
+    for (PCollectionView<?> sideInput : sideInputs) {
       Coder<? extends BoundedWindow> windowCoder =
-          input.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+          sideInput.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
       Coder<WindowedValue<?>> windowedValueCoder =
           (Coder<WindowedValue<?>>)
-              (Coder<?>) WindowedValue.getFullCoder(input.getPCollection().getCoder(), windowCoder);
-
-      Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(input);
+              (Coder<?>) WindowedValue.getFullCoder(sideInput.getPCollection().getCoder(), windowCoder);
+      Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(sideInput);
       List<WindowedValue<?>> valuesList = broadcastSet.collectAsList();
       List<byte[]> codedValues = new ArrayList<>();
       for (WindowedValue<?> v : valuesList) {
@@ -174,7 +176,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
       }
 
       sideInputBroadcast.add(
-          input.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder);
+          sideInput.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder);
     }
     return sideInputBroadcast;
   }
@@ -213,14 +215,17 @@ class ParDoTranslatorBatch<InputT, OutputT>
   private void pruneOutputFilteredByTag(
       TranslationContext context,
       Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs,
-      Map.Entry<TupleTag<?>, PValue> output) {
+      Map.Entry<TupleTag<?>, PValue> output, Coder<? extends BoundedWindow> windowCoder) {
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset =
         allOutputs.filter(new DoFnFilterFunction(output.getKey()));
+    Coder<WindowedValue<?>> windowedValueCoder =
+        (Coder<WindowedValue<?>>)
+            (Coder<?>) WindowedValue.getFullCoder(((PCollection<OutputT>)output.getValue()).getCoder(), windowCoder);
     Dataset<WindowedValue<?>> outputDataset =
         filteredDataset.map(
             (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
                 value -> value._2,
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
     context.putDatasetWildcard(output.getValue(), outputDataset);
   }