You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/10 15:48:58 UTC
[beam] 01/04: Create a Tuple2Coder to encode scale tuple2
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 30c662a6971093639f3cd84f9a3e58fa4497309f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 30 11:25:04 2019 +0200
Create a Tuple2Coder to encode scale tuple2
---
.../translation/helpers/Tuple2Coder.java | 62 ++++++++++++++++++++++
1 file changed, 62 insertions(+)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
new file mode 100644
index 0000000..1743a01
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
@@ -0,0 +1,62 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import scala.Tuple2;
+
+/**
+ * Beam coder to encode/decode Tuple2 scala types.
+ * @param <T1> first field type parameter
+ * @param <T2> second field type parameter
+ */
+public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>> {
+ private final Coder<T1> firstFieldCoder;
+ private final Coder<T2> secondFieldCoder;
+
+ public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V> secondFieldCoder) {
+ return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder);
+ }
+
+ private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder) {
+ this.firstFieldCoder = firstFieldCoder;
+ this.secondFieldCoder = secondFieldCoder;
+ }
+
+
+ @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream)
+ throws IOException {
+ firstFieldCoder.encode(value._1(), outStream);
+ secondFieldCoder.encode(value._2(), outStream);
+ }
+
+ @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException {
+ T1 firstField = firstFieldCoder.decode(inStream);
+ T2 secondField = secondFieldCoder.decode(inStream);
+ return Tuple2.apply(firstField, secondField);
+ }
+
+ @Override public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(firstFieldCoder, secondFieldCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder);
+ verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder);
+ }
+
+ /** Returns the coder for first field. */
+ public Coder<T1> getFirstFieldCoder() {
+ return firstFieldCoder;
+ }
+
+ /** Returns the coder for second field. */
+ public Coder<T2> getSecondFieldCoder() {
+ return secondFieldCoder;
+ }
+}