You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/10 14:26:24 UTC
[beam] 03/04: improve readability of options passing to the source
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 4d2d04491397b744bd14ba96516494296c20876e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 11:12:05 2019 +0100
improve readability of options passing to the source
---
.../translation/batch/ReadSourceTranslatorBatch.java | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index ebd79ac..8810e21 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.serialization.Base64Serializer;
@@ -59,14 +57,12 @@ class ReadSourceTranslatorBatch<T>
SparkSession sparkSession = context.getSparkSession();
String serializedSource = Base64Serializer.serializeUnchecked(source);
- Map<String, String> datasetSourceOptions = new HashMap<>();
- datasetSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
- datasetSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM,
- String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
- datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
- PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
- Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).options(datasetSourceOptions)
- .load();
+ Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass)
+ .option(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource)
+ .option(DatasetSourceBatch.DEFAULT_PARALLELISM,
+ String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()))
+ .option(DatasetSourceBatch.PIPELINE_OPTIONS,
+ PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load();
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
@Override public WindowedValue call(Row value) throws Exception {