You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/23 12:12:17 UTC

[beam] branch spark-runner_structured-streaming updated (c980d4c -> 46ed555)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from c980d4c  Fix typo
     new 507bbd8  Fix javadoc
     new ad29daf  Use beam encoders also in the output of the source translation
     new ebc53fd  Remove unneeded cast
     new 25d0401  Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders
     new 46ed555  Remove Encoders based on kryo now that we call Beam coders in the runner

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../batch/ReadSourceTranslatorBatch.java           |  4 +-
 .../translation/helpers/EncoderHelpers.java        | 44 ++--------------------
 .../translation/helpers/KVHelpers.java             |  4 +-
 .../translation/helpers/RowHelpers.java            |  2 +-
 .../streaming/ReadSourceTranslatorStreaming.java   |  4 +-
 5 files changed, 8 insertions(+), 50 deletions(-)


[beam] 04/05: Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 25d0401d69029f76871b26f9fbd1f5c79de39bb7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Oct 23 11:52:14 2019 +0200

    Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders
---
 .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 2f3bced..c07c9dd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -101,7 +101,8 @@ public class EncoderHelpers {
   public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder) {
 
     List<Expression> serialiserList = new ArrayList<>();
-    Class<T> claz = (Class<T>) Object.class;
+    Class<? super T> claz = beamCoder.getEncodedTypeDescriptor().getRawType();
+
     serialiserList.add(
         new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder));
     ClassTag<T> classTag = ClassTag$.MODULE$.apply(claz);


[beam] 05/05: Remove Encoders based on kryo now that we call Beam coders in the runner

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 46ed555049279e7b67f894d958763041a2e071d1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Oct 23 14:11:14 2019 +0200

    Remove Encoders based on kryo now that we call Beam coders in the runner
---
 .../translation/helpers/EncoderHelpers.java        | 41 +---------------------
 1 file changed, 1 insertion(+), 40 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index c07c9dd..704b6fe 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -51,46 +51,7 @@ import scala.reflect.ClassTag$;
 /** {@link Encoders} utility class. */
 public class EncoderHelpers {
 
-  // 1. use actual class and not object to avoid Spark fallback to GenericRowWithSchema.
-  // 2. use raw class because only raw classes can be used with kryo. Cast to Class<T> to allow
-  // the type inference mechanism to infer for ex Encoder<WindowedValue<T>> to get back the type
-  // checking
-
-  /*
-   --------- Encoders for internal spark runner objects
-  */
-
-  /**
-   * Get a bytes {@link Encoder} for {@link WindowedValue}. Bytes serialisation is issued by Kryo
-   */
-  @SuppressWarnings("unchecked")
-  public static <T> Encoder<T> windowedValueEncoder() {
-    return Encoders.kryo((Class<T>) WindowedValue.class);
-  }
-
-  /** Get a bytes {@link Encoder} for {@link KV}. Bytes serialisation is issued by Kryo */
-  @SuppressWarnings("unchecked")
-  public static <T> Encoder<T> kvEncoder() {
-    return Encoders.kryo((Class<T>) KV.class);
-  }
-
-  /** Get a bytes {@link Encoder} for {@code T}. Bytes serialisation is issued by Kryo */
-  @SuppressWarnings("unchecked")
-  public static <T> Encoder<T> genericEncoder() {
-    return Encoders.kryo((Class<T>) Object.class);
-  }
-
-  /*
-   */
-  /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */
-  /*
-
-    public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() {
-      return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder());
-    }
-  */
-
-  /*
+    /*
    --------- Bridges from Beam Coders to Spark Encoders
   */
 


[beam] 01/05: Fix javadoc

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 507bbd8390f4021cf6099a1b2c6b8eccecab575c
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Oct 21 10:38:46 2019 +0200

    Fix javadoc
---
 .../spark/structuredstreaming/translation/helpers/RowHelpers.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
index ac74c29..afb4922 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
@@ -53,7 +53,7 @@ public final class RowHelpers {
   }
 
   /**
-   * Serialize a windowedValue to bytes using windowed {@link WindowedValue.FullWindowedValueCoder}
+   * Serialize a windowedValue to bytes using windowedValueCoder {@link WindowedValue.FullWindowedValueCoder}
    * and stores it an InternalRow.
    */
   public static <T> InternalRow storeWindowedValueInRow(


[beam] 02/05: Use beam encoders also in the output of the source translation

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ad29daf87ca7a8ed8fc16ce3072d7cc7804b1867
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Oct 23 11:45:39 2019 +0200

    Use beam encoders also in the output of the source translation
---
 .../translation/batch/ReadSourceTranslatorBatch.java                  | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java          | 4 +---
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index ceb87cf..6af7f55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch<T>
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
             RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-            // using kryo bytes serialization because the mapper already calls
-            // windowedValueCoder.decode, no need to call it also in the Spark encoder
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9f1e34d..ea10272 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming<T>
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
             RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-            // using kryo bytes serialization because the mapper already calls
-            // windowedValueCoder.decode, no need to call it also in the Spark encoder
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);


[beam] 03/05: Remove unneeded cast

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ebc53fd19d73b8bd31a9d0d507ee3070a27a1e5b
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Oct 23 11:51:49 2019 +0200

    Remove unneeded cast
---
 .../spark/structuredstreaming/translation/helpers/KVHelpers.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
index 1983eaa..2fa4b1a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
@@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.MapFunction;
 public final class KVHelpers {
 
   /** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
-  public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
-    return (MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey();
+  public static <K, V>  MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
+    return wv -> wv.getValue().getKey();
   }
 }