You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/28 09:29:50 UTC

[beam] 02/04: Use raw Encoder also in regular ReadSourceTranslatorBatch

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 36a72f72bb0ce30464cd23fc3a5134f97a92a721
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:16:01 2018 +0100

    Use raw Encoder<WindowedValue> also in regular ReadSourceTranslatorBatch
---
 .../translation/TranslationContext.java            |  1 -
 .../batch/ReadSourceTranslatorBatch.java           | 22 ++++++++++------------
 .../batch/ReadSourceTranslatorMockBatch.java       |  2 ++
 3 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 82aa80b..acc49f4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -115,7 +115,6 @@ public class TranslationContext {
     }
   }
 
-    //TODO: remove. It is just for testing
     public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
       if (!datasets.containsKey(value)) {
         datasets.put(value, dataset);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 370e3f4..d980a52 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetStreamingSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -30,9 +29,9 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
@@ -47,7 +46,6 @@ class ReadSourceTranslatorBatch<T>
         (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
             context.getCurrentTransform();
 
-        String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$"));
         BoundedSource<T> source;
     try {
       source = ReadTranslation.boundedSourceFromTransform(rootTransform);
@@ -56,20 +54,20 @@ class ReadSourceTranslatorBatch<T>
     }
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.read().format(providerClassName).load();
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
-    //TODO initialize source : how, to get a reference to the DatasetStreamingSource instance that spark
-    // instantiates to be able to call DatasetStreamingSource.initialize(). How to pass in a DatasetCatalog?
-    MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
-      @Override public WindowedValue<T> call(Row value) throws Exception {
+    //TODO pass the source and the translation context serialized as string to the DatasetSource
+    MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
+      @Override public WindowedValue call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue<T>>getAs(0);
+        return value.<WindowedValue>getAs(0);
       }
     };
-    //TODO fix encoder: how to get an Encoder<WindowedValue<T>>
-    Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null);
+    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+    // be created ?
+    Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
-    context.putDataset(output, dataset);
+    context.putDatasetRaw(output, dataset);
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 758ff1d..d7b9175 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -52,6 +52,8 @@ class ReadSourceTranslatorMockBatch<T>
         return value.<WindowedValue>getAs(0);
       }
     };
+    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+    // be created ?
     Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();