You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/10 15:48:58 UTC

[beam] 01/04: Create a Tuple2Coder to encode scale tuple2

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 30c662a6971093639f3cd84f9a3e58fa4497309f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 30 11:25:04 2019 +0200

    Create a Tuple2Coder to encode scale tuple2
---
 .../translation/helpers/Tuple2Coder.java           | 62 ++++++++++++++++++++++
 1 file changed, 62 insertions(+)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
new file mode 100644
index 0000000..1743a01
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
@@ -0,0 +1,62 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import scala.Tuple2;
+
+/**
+ * Beam coder to encode/decode Tuple2 scala types.
+ * @param <T1> first field type parameter
+ * @param <T2> second field type parameter
+ */
+public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>> {
+  private final Coder<T1> firstFieldCoder;
+  private final Coder<T2> secondFieldCoder;
+
+  public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V> secondFieldCoder) {
+    return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder);
+  }
+
+  private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder) {
+    this.firstFieldCoder = firstFieldCoder;
+    this.secondFieldCoder = secondFieldCoder;
+  }
+
+
+  @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream)
+      throws IOException {
+    firstFieldCoder.encode(value._1(), outStream);
+    secondFieldCoder.encode(value._2(), outStream);
+  }
+
+  @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException {
+    T1 firstField = firstFieldCoder.decode(inStream);
+    T2 secondField = secondFieldCoder.decode(inStream);
+    return Tuple2.apply(firstField, secondField);
+  }
+
+  @Override public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(firstFieldCoder, secondFieldCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder);
+    verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder);
+  }
+
+  /** Returns the coder for first field. */
+  public Coder<T1> getFirstFieldCoder() {
+    return firstFieldCoder;
+  }
+
+  /** Returns the coder for second field. */
+  public Coder<T2> getSecondFieldCoder() {
+    return secondFieldCoder;
+  }
+}