You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/28 09:29:50 UTC
[beam] 02/04: Use raw Encoder also in regular
ReadSourceTranslatorBatch
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 36a72f72bb0ce30464cd23fc3a5134f97a92a721
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:16:01 2018 +0100
Use raw Encoder<WindowedValue> also in regular ReadSourceTranslatorBatch
---
.../translation/TranslationContext.java | 1 -
.../batch/ReadSourceTranslatorBatch.java | 22 ++++++++++------------
.../batch/ReadSourceTranslatorMockBatch.java | 2 ++
3 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 82aa80b..acc49f4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -115,7 +115,6 @@ public class TranslationContext {
}
}
- //TODO: remove. It is just for testing
public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
if (!datasets.containsKey(value)) {
datasets.put(value, dataset);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 370e3f4..d980a52 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetStreamingSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -30,9 +29,9 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.streaming.DataStreamReader;
class ReadSourceTranslatorBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
@@ -47,7 +46,6 @@ class ReadSourceTranslatorBatch<T>
(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
context.getCurrentTransform();
- String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$"));
BoundedSource<T> source;
try {
source = ReadTranslation.boundedSourceFromTransform(rootTransform);
@@ -56,20 +54,20 @@ class ReadSourceTranslatorBatch<T>
}
SparkSession sparkSession = context.getSparkSession();
- Dataset<Row> rowDataset = sparkSession.read().format(providerClassName).load();
+ Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
- //TODO initialize source : how, to get a reference to the DatasetStreamingSource instance that spark
- // instantiates to be able to call DatasetStreamingSource.initialize(). How to pass in a DatasetCatalog?
- MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
- @Override public WindowedValue<T> call(Row value) throws Exception {
+ //TODO pass the source and the translation context serialized as string to the DatasetSource
+ MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
+ @Override public WindowedValue call(Row value) throws Exception {
//there is only one value put in each Row by the InputPartitionReader
- return value.<WindowedValue<T>>getAs(0);
+ return value.<WindowedValue>getAs(0);
}
};
- //TODO fix encoder: how to get an Encoder<WindowedValue<T>>
- Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null);
+ //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+ // be created ?
+ Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
PCollection<T> output = (PCollection<T>) context.getOutput();
- context.putDataset(output, dataset);
+ context.putDatasetRaw(output, dataset);
}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 758ff1d..d7b9175 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -52,6 +52,8 @@ class ReadSourceTranslatorMockBatch<T>
return value.<WindowedValue>getAs(0);
}
};
+ //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+ // be created ?
Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
PCollection<T> output = (PCollection<T>) context.getOutput();