You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/24 10:08:56 UTC

[beam] 32/37: Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c5e78a0f4552a094ba3914ef490629e136ac1beb
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Oct 1 17:52:32 2019 +0200

    Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner
---
 .../translation/batch/ParDoTranslatorBatch.java    | 42 +++++++++------
 .../translation/helpers/EncoderHelpers.java        |  6 ++-
 .../translation/helpers/MultiOuputCoder.java       | 49 +++++++++++++++++
 .../translation/helpers/Tuple2Coder.java           | 62 ----------------------
 4 files changed, 81 insertions(+), 78 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 255adc8..f5a109e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -31,8 +31,10 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.MultiOuputCoder;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -84,12 +86,15 @@ class ParDoTranslatorBatch<InputT, OutputT>
         ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
 
     // Init main variables
-    Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
+    PValue input = context.getInput();
+    Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(input);
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
     TupleTag<?> mainOutputTag = getTupleTag(context);
     List<TupleTag<?>> outputTags = new ArrayList<>(outputs.keySet());
     WindowingStrategy<?, ?> windowingStrategy =
-        ((PCollection<InputT>) context.getInput()).getWindowingStrategy();
+        ((PCollection<InputT>) input).getWindowingStrategy();
+    Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
+    Coder<? extends BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
 
     // construct a map from side input to WindowingStrategy so that
     // the DoFn runner can map main-input windows to side input windows
@@ -102,8 +107,6 @@ class ParDoTranslatorBatch<InputT, OutputT>
     SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context);
 
     Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
-    Coder<InputT> inputCoder = ((PCollection<InputT>) context.getInput()).getCoder();
-
     MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance();
 
     List<TupleTag<?>> additionalOutputTags = new ArrayList<>();
@@ -129,19 +132,25 @@ class ParDoTranslatorBatch<InputT, OutputT>
             broadcastStateData,
             doFnSchemaInformation);
 
+    MultiOuputCoder multipleOutputCoder = MultiOuputCoder
+        .of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder);
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
-        inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder());
+        inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder));
     if (outputs.entrySet().size() > 1) {
       allOutputs.persist();
       for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-        pruneOutputFilteredByTag(context, allOutputs, output);
+        pruneOutputFilteredByTag(context, allOutputs, output, windowCoder);
       }
     } else {
+      Coder<OutputT> outputCoder = ((PCollection<OutputT>) outputs.get(mainOutputTag)).getCoder();
+      Coder<WindowedValue<?>> windowedValueCoder =
+          (Coder<WindowedValue<?>>)
+              (Coder<?>) WindowedValue.getFullCoder(outputCoder, windowCoder);
       Dataset<WindowedValue<?>> outputDataset =
           allOutputs.map(
               (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
                   value -> value._2,
-              EncoderHelpers.windowedValueEncoder());
+              EncoderHelpers.fromBeamCoder(windowedValueCoder));
       context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset);
     }
   }
@@ -152,14 +161,14 @@ class ParDoTranslatorBatch<InputT, OutputT>
         JavaSparkContext.fromSparkContext(context.getSparkSession().sparkContext());
 
     SideInputBroadcast sideInputBroadcast = new SideInputBroadcast();
-    for (PCollectionView<?> input : sideInputs) {
+    for (PCollectionView<?> sideInput : sideInputs) {
       Coder<? extends BoundedWindow> windowCoder =
-          input.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+          sideInput.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
       Coder<WindowedValue<?>> windowedValueCoder =
           (Coder<WindowedValue<?>>)
-              (Coder<?>) WindowedValue.getFullCoder(input.getPCollection().getCoder(), windowCoder);
-
-      Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(input);
+              (Coder<?>) WindowedValue.getFullCoder(sideInput.getPCollection().getCoder(), windowCoder);
+      Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(sideInput);
       List<WindowedValue<?>> valuesList = broadcastSet.collectAsList();
       List<byte[]> codedValues = new ArrayList<>();
       for (WindowedValue<?> v : valuesList) {
@@ -167,7 +176,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
       }
 
       sideInputBroadcast.add(
-          input.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder);
+          sideInput.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder);
     }
     return sideInputBroadcast;
   }
@@ -206,14 +215,17 @@ class ParDoTranslatorBatch<InputT, OutputT>
   private void pruneOutputFilteredByTag(
       TranslationContext context,
       Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs,
-      Map.Entry<TupleTag<?>, PValue> output) {
+      Map.Entry<TupleTag<?>, PValue> output, Coder<? extends BoundedWindow> windowCoder) {
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset =
         allOutputs.filter(new DoFnFilterFunction(output.getKey()));
+    Coder<WindowedValue<?>> windowedValueCoder =
+        (Coder<WindowedValue<?>>)
+            (Coder<?>) WindowedValue.getFullCoder(((PCollection<OutputT>)output.getValue()).getCoder(), windowCoder);
     Dataset<WindowedValue<?>> outputDataset =
         filteredDataset.map(
             (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
                 value -> value._2,
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
     context.putDatasetWildcard(output.getValue(), outputDataset);
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 218dc0a..a4f0320 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -81,10 +81,14 @@ public class EncoderHelpers {
     return Encoders.kryo((Class<T>) Object.class);
   }
 
-  /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */
+/*
+  */
+/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//*
+
   public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() {
     return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder());
   }
+*/
 
   /*
    --------- Bridges from Beam Coders to Spark Encoders
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
new file mode 100644
index 0000000..caaea01
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
@@ -0,0 +1,49 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import scala.Tuple2;
+
+public class MultiOuputCoder<T> extends CustomCoder<Tuple2<TupleTag<T>, WindowedValue<T>>> {
+  Coder<TupleTag> tupleTagCoder;
+  Map<TupleTag<?>, Coder<?>> coderMap;
+  Coder<? extends BoundedWindow> windowCoder;
+
+  public static MultiOuputCoder of(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) {
+    return new MultiOuputCoder(tupleTagCoder, coderMap, windowCoder);
+  }
+
+  private MultiOuputCoder(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) {
+    this.tupleTagCoder = tupleTagCoder;
+    this.coderMap = coderMap;
+    this.windowCoder = windowCoder;
+  }
+
+  @Override public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream)
+      throws IOException {
+    TupleTag<T> tupleTag = tuple2._1();
+    tupleTagCoder.encode(tupleTag, outStream);
+    Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag);
+    WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder
+        .of(valueCoder, windowCoder);
+    wvCoder.encode(tuple2._2(), outStream);
+  }
+
+  @Override public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream)
+      throws CoderException, IOException {
+    TupleTag<T> tupleTag = (TupleTag<T>) tupleTagCoder.decode(inStream);
+    Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag);
+    WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder
+        .of(valueCoder, windowCoder);
+    WindowedValue<T> wv = wvCoder.decode(inStream);
+    return Tuple2.apply(tupleTag, wv);
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
deleted file mode 100644
index 1743a01..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import scala.Tuple2;
-
-/**
- * Beam coder to encode/decode Tuple2 scala types.
- * @param <T1> first field type parameter
- * @param <T2> second field type parameter
- */
-public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>> {
-  private final Coder<T1> firstFieldCoder;
-  private final Coder<T2> secondFieldCoder;
-
-  public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V> secondFieldCoder) {
-    return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder);
-  }
-
-  private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder) {
-    this.firstFieldCoder = firstFieldCoder;
-    this.secondFieldCoder = secondFieldCoder;
-  }
-
-
-  @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream)
-      throws IOException {
-    firstFieldCoder.encode(value._1(), outStream);
-    secondFieldCoder.encode(value._2(), outStream);
-  }
-
-  @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException {
-    T1 firstField = firstFieldCoder.decode(inStream);
-    T2 secondField = secondFieldCoder.decode(inStream);
-    return Tuple2.apply(firstField, secondField);
-  }
-
-  @Override public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(firstFieldCoder, secondFieldCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder);
-    verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder);
-  }
-
-  /** Returns the coder for first field. */
-  public Coder<T1> getFirstFieldCoder() {
-    return firstFieldCoder;
-  }
-
-  /** Returns the coder for second field. */
-  public Coder<T2> getSecondFieldCoder() {
-    return secondFieldCoder;
-  }
-}