You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/10 15:48:57 UTC

[beam] branch spark-runner_structured-streaming updated (d093ffe -> ee2c0e6)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from d093ffe  Apply spotless
     new 30c662a  Create a Tuple2Coder to encode scale tuple2
     new 868204f  Apply new Encoders to GroupByKey
     new 31c91a9  Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner
     new ee2c0e6  Apply spotless

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../batch/GroupByKeyTranslatorBatch.java           | 25 +++++--
 .../translation/batch/ParDoTranslatorBatch.java    | 17 +++--
 .../translation/helpers/EncoderHelpers.java        | 12 ++--
 .../translation/helpers/MultiOuputCoder.java       | 80 ++++++++++++++++++++++
 4 files changed, 119 insertions(+), 15 deletions(-)
 create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java


[beam] 04/04: Apply spotless

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ee2c0e68de32682ab38c682347f46db8edc8cc06
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Oct 10 17:34:30 2019 +0200

    Apply spotless
---
 .../batch/GroupByKeyTranslatorBatch.java           |  8 ++--
 .../translation/batch/ParDoTranslatorBatch.java    | 13 +++---
 .../translation/helpers/EncoderHelpers.java        | 16 +++----
 .../translation/helpers/MultiOuputCoder.java       | 51 +++++++++++++++++-----
 4 files changed, 60 insertions(+), 28 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 2970aa7..3ebe477 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -62,8 +62,7 @@ class GroupByKeyTranslatorBatch<K, V>
     // group by key only
     Coder<K> keyCoder = kvCoder.getKeyCoder();
     KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
-        input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(
-            keyCoder));
+        input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));
 
     // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
     Coder<V> valueCoder = kvCoder.getValueCoder();
@@ -92,8 +91,9 @@ class GroupByKeyTranslatorBatch<K, V>
             EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder)));
 
     // group also by windows
-    WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = WindowedValue.FullWindowedValueCoder
-        .of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
+    WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
             windowingStrategy.getWindowFn().windowCoder());
     Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
         materialized.flatMap(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 9fec39a..e73d38e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -38,11 +38,9 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -133,10 +131,13 @@ class ParDoTranslatorBatch<InputT, OutputT>
             broadcastStateData,
             doFnSchemaInformation);
 
-    MultiOuputCoder multipleOutputCoder = MultiOuputCoder.of(SerializableCoder.of(TupleTag.class), outputCoderMap,
-        windowingStrategy.getWindowFn().windowCoder());
-    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs = inputDataSet
-        .mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder));
+    MultiOuputCoder multipleOutputCoder =
+        MultiOuputCoder.of(
+            SerializableCoder.of(TupleTag.class),
+            outputCoderMap,
+            windowingStrategy.getWindowFn().windowCoder());
+    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
+        inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder));
     if (outputs.entrySet().size() > 1) {
       allOutputs.persist();
       for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index a4f0320..2f3bced 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.ObjectType;
 import scala.StringContext;
-import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
@@ -81,14 +80,15 @@ public class EncoderHelpers {
     return Encoders.kryo((Class<T>) Object.class);
   }
 
-/*
-  */
-/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//*
+  /*
+   */
+  /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */
+  /*
 
-  public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() {
-    return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder());
-  }
-*/
+    public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() {
+      return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder());
+    }
+  */
 
   /*
    --------- Bridges from Beam Coders to Spark Encoders
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
index caaea01..82f0e4f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
 import java.io.IOException;
@@ -12,37 +29,51 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import scala.Tuple2;
 
+/**
+ * Coder to serialize and deserialize {@code}Tuple2<TupleTag<T>, WindowedValue<T>{/@code} to be used
+ * in spark encoders while applying {@link org.apache.beam.sdk.transforms.DoFn}.
+ *
+ * @param <T> type of the elements in the collection
+ */
 public class MultiOuputCoder<T> extends CustomCoder<Tuple2<TupleTag<T>, WindowedValue<T>>> {
   Coder<TupleTag> tupleTagCoder;
   Map<TupleTag<?>, Coder<?>> coderMap;
   Coder<? extends BoundedWindow> windowCoder;
 
-  public static MultiOuputCoder of(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) {
+  public static MultiOuputCoder of(
+      Coder<TupleTag> tupleTagCoder,
+      Map<TupleTag<?>, Coder<?>> coderMap,
+      Coder<? extends BoundedWindow> windowCoder) {
     return new MultiOuputCoder(tupleTagCoder, coderMap, windowCoder);
   }
 
-  private MultiOuputCoder(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) {
+  private MultiOuputCoder(
+      Coder<TupleTag> tupleTagCoder,
+      Map<TupleTag<?>, Coder<?>> coderMap,
+      Coder<? extends BoundedWindow> windowCoder) {
     this.tupleTagCoder = tupleTagCoder;
     this.coderMap = coderMap;
     this.windowCoder = windowCoder;
   }
 
-  @Override public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream)
+  @Override
+  public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream)
       throws IOException {
     TupleTag<T> tupleTag = tuple2._1();
     tupleTagCoder.encode(tupleTag, outStream);
-    Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag);
-    WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder
-        .of(valueCoder, windowCoder);
+    Coder<T> valueCoder = (Coder<T>) coderMap.get(tupleTag);
+    WindowedValue.FullWindowedValueCoder<T> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder);
     wvCoder.encode(tuple2._2(), outStream);
   }
 
-  @Override public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream)
+  @Override
+  public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream)
       throws CoderException, IOException {
     TupleTag<T> tupleTag = (TupleTag<T>) tupleTagCoder.decode(inStream);
-    Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag);
-    WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder
-        .of(valueCoder, windowCoder);
+    Coder<T> valueCoder = (Coder<T>) coderMap.get(tupleTag);
+    WindowedValue.FullWindowedValueCoder<T> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder);
     WindowedValue<T> wv = wvCoder.decode(inStream);
     return Tuple2.apply(tupleTag, wv);
   }


[beam] 01/04: Create a Tuple2Coder to encode scale tuple2

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 30c662a6971093639f3cd84f9a3e58fa4497309f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 30 11:25:04 2019 +0200

    Create a Tuple2Coder to encode scale tuple2
---
 .../translation/helpers/Tuple2Coder.java           | 62 ++++++++++++++++++++++
 1 file changed, 62 insertions(+)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
new file mode 100644
index 0000000..1743a01
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
@@ -0,0 +1,62 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import scala.Tuple2;
+
+/**
+ * Beam coder to encode/decode Tuple2 scala types.
+ * @param <T1> first field type parameter
+ * @param <T2> second field type parameter
+ */
+public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>> {
+  private final Coder<T1> firstFieldCoder;
+  private final Coder<T2> secondFieldCoder;
+
+  public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V> secondFieldCoder) {
+    return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder);
+  }
+
+  private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder) {
+    this.firstFieldCoder = firstFieldCoder;
+    this.secondFieldCoder = secondFieldCoder;
+  }
+
+
+  @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream)
+      throws IOException {
+    firstFieldCoder.encode(value._1(), outStream);
+    secondFieldCoder.encode(value._2(), outStream);
+  }
+
+  @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException {
+    T1 firstField = firstFieldCoder.decode(inStream);
+    T2 secondField = secondFieldCoder.decode(inStream);
+    return Tuple2.apply(firstField, secondField);
+  }
+
+  @Override public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(firstFieldCoder, secondFieldCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder);
+    verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder);
+  }
+
+  /** Returns the coder for first field. */
+  public Coder<T1> getFirstFieldCoder() {
+    return firstFieldCoder;
+  }
+
+  /** Returns the coder for second field. */
+  public Coder<T2> getSecondFieldCoder() {
+    return secondFieldCoder;
+  }
+}


[beam] 02/04: Apply new Encoders to GroupByKey

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 868204f2d2de27ab7f37e4630a0b52a60092b766
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 30 12:13:25 2019 +0200

    Apply new Encoders to GroupByKey
---
 .../batch/GroupByKeyTranslatorBatch.java           | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 3e203a8..2970aa7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch<K, V>
 
     Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection);
 
+    WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
+    KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
+
     // group by key only
+    Coder<K> keyCoder = kvCoder.getKeyCoder();
     KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
-        input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+        input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(
+            keyCoder));
 
     // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
+    Coder<V> valueCoder = kvCoder.getValueCoder();
+    WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder());
+    IterableCoder<WindowedValue<V>> iterableCoder = IterableCoder.of(wvCoder);
     Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized =
         groupByKeyOnly.mapGroups(
             (MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>)
@@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch<K, V>
                       KV.of(key, Iterables.unmodifiableIterable(values));
                   return kv;
                 },
-            EncoderHelpers.kvEncoder());
+            EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder)));
 
-    WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
-    KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder();
     // group also by windows
+    WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = WindowedValue.FullWindowedValueCoder
+        .of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
+            windowingStrategy.getWindowFn().windowCoder());
     Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
         materialized.flatMap(
             new GroupAlsoByWindowViaOutputBufferFn<>(
                 windowingStrategy,
                 new InMemoryStateInternalsFactory<>(),
-                SystemReduceFn.buffering(coder.getValueCoder()),
+                SystemReduceFn.buffering(valueCoder),
                 context.getSerializableOptions()),
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(outputCoder));
 
     context.putDataset(context.getOutput(), output);
   }


[beam] 03/04: Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 31c91a90a38638bc551c913fbda7b72bb3546d0b
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Oct 1 17:52:32 2019 +0200

    Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner
---
 .../translation/batch/ParDoTranslatorBatch.java    | 18 ++++---
 .../translation/helpers/EncoderHelpers.java        |  6 ++-
 .../translation/helpers/MultiOuputCoder.java       | 49 +++++++++++++++++
 .../translation/helpers/Tuple2Coder.java           | 62 ----------------------
 4 files changed, 66 insertions(+), 69 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 255adc8..9fec39a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -31,14 +31,18 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.MultiOuputCoder;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -84,12 +88,13 @@ class ParDoTranslatorBatch<InputT, OutputT>
         ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
 
     // Init main variables
-    Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
+    PValue input = context.getInput();
+    Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(input);
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
     TupleTag<?> mainOutputTag = getTupleTag(context);
     List<TupleTag<?>> outputTags = new ArrayList<>(outputs.keySet());
     WindowingStrategy<?, ?> windowingStrategy =
-        ((PCollection<InputT>) context.getInput()).getWindowingStrategy();
+        ((PCollection<InputT>) input).getWindowingStrategy();
 
     // construct a map from side input to WindowingStrategy so that
     // the DoFn runner can map main-input windows to side input windows
@@ -102,8 +107,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
     SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context);
 
     Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
-    Coder<InputT> inputCoder = ((PCollection<InputT>) context.getInput()).getCoder();
-
+    Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
     MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance();
 
     List<TupleTag<?>> additionalOutputTags = new ArrayList<>();
@@ -129,8 +133,10 @@ class ParDoTranslatorBatch<InputT, OutputT>
             broadcastStateData,
             doFnSchemaInformation);
 
-    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
-        inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder());
+    MultiOuputCoder multipleOutputCoder = MultiOuputCoder.of(SerializableCoder.of(TupleTag.class), outputCoderMap,
+        windowingStrategy.getWindowFn().windowCoder());
+    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs = inputDataSet
+        .mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder));
     if (outputs.entrySet().size() > 1) {
       allOutputs.persist();
       for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 218dc0a..a4f0320 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -81,10 +81,14 @@ public class EncoderHelpers {
     return Encoders.kryo((Class<T>) Object.class);
   }
 
-  /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */
+/*
+  */
+/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//*
+
   public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() {
     return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder());
   }
+*/
 
   /*
    --------- Bridges from Beam Coders to Spark Encoders
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
new file mode 100644
index 0000000..caaea01
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java
@@ -0,0 +1,49 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import scala.Tuple2;
+
+public class MultiOuputCoder<T> extends CustomCoder<Tuple2<TupleTag<T>, WindowedValue<T>>> {
+  Coder<TupleTag> tupleTagCoder;
+  Map<TupleTag<?>, Coder<?>> coderMap;
+  Coder<? extends BoundedWindow> windowCoder;
+
+  public static MultiOuputCoder of(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) {
+    return new MultiOuputCoder(tupleTagCoder, coderMap, windowCoder);
+  }
+
+  private MultiOuputCoder(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) {
+    this.tupleTagCoder = tupleTagCoder;
+    this.coderMap = coderMap;
+    this.windowCoder = windowCoder;
+  }
+
+  @Override public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream)
+      throws IOException {
+    TupleTag<T> tupleTag = tuple2._1();
+    tupleTagCoder.encode(tupleTag, outStream);
+    Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag);
+    WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder
+        .of(valueCoder, windowCoder);
+    wvCoder.encode(tuple2._2(), outStream);
+  }
+
+  @Override public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream)
+      throws CoderException, IOException {
+    TupleTag<T> tupleTag = (TupleTag<T>) tupleTagCoder.decode(inStream);
+    Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag);
+    WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder
+        .of(valueCoder, windowCoder);
+    WindowedValue<T> wv = wvCoder.decode(inStream);
+    return Tuple2.apply(tupleTag, wv);
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
deleted file mode 100644
index 1743a01..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import scala.Tuple2;
-
-/**
- * Beam coder to encode/decode Tuple2 scala types.
- * @param <T1> first field type parameter
- * @param <T2> second field type parameter
- */
-public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>> {
-  private final Coder<T1> firstFieldCoder;
-  private final Coder<T2> secondFieldCoder;
-
-  public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V> secondFieldCoder) {
-    return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder);
-  }
-
-  private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder) {
-    this.firstFieldCoder = firstFieldCoder;
-    this.secondFieldCoder = secondFieldCoder;
-  }
-
-
-  @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream)
-      throws IOException {
-    firstFieldCoder.encode(value._1(), outStream);
-    secondFieldCoder.encode(value._2(), outStream);
-  }
-
-  @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException {
-    T1 firstField = firstFieldCoder.decode(inStream);
-    T2 secondField = secondFieldCoder.decode(inStream);
-    return Tuple2.apply(firstField, secondField);
-  }
-
-  @Override public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(firstFieldCoder, secondFieldCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder);
-    verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder);
-  }
-
-  /** Returns the coder for first field. */
-  public Coder<T1> getFirstFieldCoder() {
-    return firstFieldCoder;
-  }
-
-  /** Returns the coder for second field. */
-  public Coder<T2> getSecondFieldCoder() {
-    return secondFieldCoder;
-  }
-}