You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:59 UTC
[beam] 37/50: Use raw WindowedValue so that spark Encoders could
work (temporary)
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1060121a4356b6c0d01227aed7631821df4394e1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 21 16:05:31 2018 +0100
Use raw WindowedValue so that spark Encoders could work (temporary)
---
.../translation/TranslationContext.java | 8 ++++++++
.../batch/ReadSourceTranslatorMockBatch.java | 20 +++++---------------
2 files changed, 13 insertions(+), 15 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 0f2493d..fb36b37 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -115,6 +115,14 @@ public class TranslationContext {
}
}
+ //TODO: remove. It is just for testing
+ public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
+ if (!datasets.containsKey(value)) {
+ datasets.put(value, dataset);
+ leaves.add(dataset);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// PCollections methods
// --------------------------------------------------------------------------------------------
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 504a64d..4a509de 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -50,25 +50,15 @@ class ReadSourceTranslatorMockBatch<T>
Dataset<Row> rowDataset = dataStreamReader.load();
- MapFunction<Row, WindowedValue<Integer>> func = new MapFunction<Row, WindowedValue<Integer>>() {
- @Override public WindowedValue<Integer> call(Row value) throws Exception {
+ MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
+ @Override public WindowedValue call(Row value) throws Exception {
//there is only one value put in each Row by the InputPartitionReader
- return value.<WindowedValue<Integer>>getAs(0);
+ return value.<WindowedValue>getAs(0);
}
};
- Dataset<WindowedValue<Integer>> dataset = rowDataset.map(func, new Encoder<WindowedValue<Integer>>() {
-
- @Override public StructType schema() {
- return null;
- }
-
- @Override public ClassTag<WindowedValue<Integer>> clsTag() {
- return scala.reflect.ClassTag$.MODULE$.<WindowedValue<Integer>>apply(WindowedValue.class);
- }
- });
+ Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
PCollection<T> output = (PCollection<T>) context.getOutput();
- context.putDataset(output, dataset);
- dataset.show();
+ context.putDatasetRaw(output, dataset);
}
}