You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/10 14:26:24 UTC

[beam] 03/04: improve readability of options passing to the source

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4d2d04491397b744bd14ba96516494296c20876e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 11:12:05 2019 +0100

    improve readability of options passing to the source
---
 .../translation/batch/ReadSourceTranslatorBatch.java     | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index ebd79ac..8810e21 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
@@ -59,14 +57,12 @@ class ReadSourceTranslatorBatch<T>
     SparkSession sparkSession = context.getSparkSession();
 
     String serializedSource = Base64Serializer.serializeUnchecked(source);
-    Map<String, String> datasetSourceOptions = new HashMap<>();
-    datasetSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
-    datasetSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM,
-        String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
-    datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
-        PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
-    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).options(datasetSourceOptions)
-        .load();
+    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass)
+        .option(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource)
+        .option(DatasetSourceBatch.DEFAULT_PARALLELISM,
+            String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()))
+        .option(DatasetSourceBatch.PIPELINE_OPTIONS,
+            PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {