You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:55 UTC

[beam] 33/50: Improve type enforcement in ReadSourceTranslator

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b7283d7810f5ac0fbbd6003dbacfd65d20458563
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Dec 11 16:21:05 2018 +0100

    Improve type enforcement in ReadSourceTranslator
---
 .../translation/batch/ReadSourceTranslatorBatch.java               | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index a75730a..2c1aa93 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -62,11 +63,11 @@ class ReadSourceTranslatorBatch<T>
     // instantiates to be able to call DatasetSource.initialize()
     MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
       @Override public WindowedValue<T> call(Row value) throws Exception {
-        //TODO fix row content extraction: I guess cast is not enough
-        return (WindowedValue<T>) value.get(0);
+        //there is only one value put in each Row by the InputPartitionReader
+        return value.<WindowedValue<T>>getAs(0);
       }
     };
-    //TODO fix encoder
+    //TODO fix encoder: how to get an Encoder<WindowedValue<T>>
     Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null);
 
     PCollection<T> output = (PCollection<T>) context.getOutput();