You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:59 UTC

[beam] 37/50: Use raw WindowedValue so that spark Encoders could work (temporary)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1060121a4356b6c0d01227aed7631821df4394e1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 21 16:05:31 2018 +0100

    Use raw WindowedValue so that spark Encoders could work (temporary)
---
 .../translation/TranslationContext.java              |  8 ++++++++
 .../batch/ReadSourceTranslatorMockBatch.java         | 20 +++++---------------
 2 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 0f2493d..fb36b37 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -115,6 +115,14 @@ public class TranslationContext {
     }
   }
 
+    //TODO: remove. It is just for testing
+    public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
+      if (!datasets.containsKey(value)) {
+        datasets.put(value, dataset);
+        leaves.add(dataset);
+      }
+    }
+
   // --------------------------------------------------------------------------------------------
   //  PCollections methods
   // --------------------------------------------------------------------------------------------
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 504a64d..4a509de 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -50,25 +50,15 @@ class ReadSourceTranslatorMockBatch<T>
 
     Dataset<Row> rowDataset = dataStreamReader.load();
 
-    MapFunction<Row, WindowedValue<Integer>> func = new MapFunction<Row, WindowedValue<Integer>>() {
-      @Override public WindowedValue<Integer> call(Row value) throws Exception {
+    MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
+      @Override public WindowedValue call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue<Integer>>getAs(0);
+        return value.<WindowedValue>getAs(0);
       }
     };
-    Dataset<WindowedValue<Integer>> dataset = rowDataset.map(func, new Encoder<WindowedValue<Integer>>() {
-
-      @Override public StructType schema() {
-        return null;
-      }
-
-      @Override public ClassTag<WindowedValue<Integer>> clsTag() {
-        return scala.reflect.ClassTag$.MODULE$.<WindowedValue<Integer>>apply(WindowedValue.class);
-      }
-    });
+    Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
-    context.putDataset(output, dataset);
-    dataset.show();
+    context.putDatasetRaw(output, dataset);
   }
 }