You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/10 14:26:21 UTC

[beam] branch spark-runner_structured-streaming updated (9cd96e8 -> 8d08a9d)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 9cd96e8  Refactor SourceTest to a UTest instaed of a main
     new a3eb035  Deactivate deps resolution forcing that prevent using correct spark transitive dep
     new 3cc76b8  Fix pipeline triggering: use a spark action instead of writing the dataset
     new 4d2d044  improve readability of options passing to the source
     new 8d08a9d  Cleaning unneeded fields in DatasetReader

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 runners/spark-structured-streaming/build.gradle          | 16 ++++++++++++----
 .../translation/TranslationContext.java                  |  3 ++-
 .../translation/batch/DatasetSourceBatch.java            |  2 --
 .../translation/batch/ReadSourceTranslatorBatch.java     | 16 ++++++----------
 4 files changed, 20 insertions(+), 17 deletions(-)


[beam] 01/04: Deactivate deps resolution forcing that prevent using correct spark transitive dep

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a3eb035ea473f3f8ef20dea3f51a1cdf0d42bd94
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 10:55:54 2019 +0100

    Deactivate deps resolution forcing that prevent using correct spark transitive dep
---
 runners/spark-structured-streaming/build.gradle | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/runners/spark-structured-streaming/build.gradle b/runners/spark-structured-streaming/build.gradle
index 803058f..3154fca 100644
--- a/runners/spark-structured-streaming/build.gradle
+++ b/runners/spark-structured-streaming/build.gradle
@@ -34,6 +34,17 @@ configurations {
   validatesRunner
 }
 
+configurations.all {
+  resolutionStrategy {
+    // Beam parent forces all deps to be the versions defined. In our case we have spark-core
+    // defined to v2.3.2 for "old" spark runner. So spark-core transitive dep of spark-sql v2.4.0
+    // gets forced to 2.3.2. Remove it from the forced deps and add v 2.4.0
+//    forcedModules.removeElement("org.apache.spark:spark-core_2.11:2.3.2")
+//    forcedModules.removeElement("org.apache.spark:spark-network-common_2.11:2.3.2")
+//    forcedModules.removeElement("org.apache.spark:spark-streaming_2.11:2.3.2")
+    forcedModules = []
+  }
+}
 test {
   systemProperty "spark.ui.enabled", "false"
   systemProperty "spark.ui.showConsoleProgress", "false"
@@ -45,10 +56,7 @@ test {
 }
 
 dependencies {
-  compile('org.apache.spark:spark-core_2.11:2.4.0'){
-    exclude group: 'org.json4s', module: 'json4s-jackson_2.11'
-  }
-  shadow "org.json4s:json4s-jackson_2.11:3.6.3"
+  shadow "com.fasterxml.jackson.module:jackson-module-scala_2.11:2.9.8"
   shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow")


[beam] 04/04: Cleaning unneeded fields in DatasetReader

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8d08a9d86bcaa03ec33c030d477501544ee717fc
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 12:11:45 2019 +0100

    Cleaning unneeded fields in DatasetReader
---
 .../spark/structuredstreaming/translation/batch/DatasetSourceBatch.java | 2 --
 1 file changed, 2 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index df88e8e..421a3f9 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -85,8 +85,6 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
     private int numPartitions;
     private BoundedSource<T> source;
     private SparkPipelineOptions sparkPipelineOptions;
-    private Optional<StructType> schema;
-    private String checkpointLocation;
 
     private DatasetReader(int numPartitions, BoundedSource<T> source,
         SparkPipelineOptions sparkPipelineOptions) {


[beam] 03/04: improve readability of options passing to the source

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4d2d04491397b744bd14ba96516494296c20876e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 11:12:05 2019 +0100

    improve readability of options passing to the source
---
 .../translation/batch/ReadSourceTranslatorBatch.java     | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index ebd79ac..8810e21 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
@@ -59,14 +57,12 @@ class ReadSourceTranslatorBatch<T>
     SparkSession sparkSession = context.getSparkSession();
 
     String serializedSource = Base64Serializer.serializeUnchecked(source);
-    Map<String, String> datasetSourceOptions = new HashMap<>();
-    datasetSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
-    datasetSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM,
-        String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
-    datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
-        PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
-    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).options(datasetSourceOptions)
-        .load();
+    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass)
+        .option(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource)
+        .option(DatasetSourceBatch.DEFAULT_PARALLELISM,
+            String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()))
+        .option(DatasetSourceBatch.PIPELINE_OPTIONS,
+            PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {


[beam] 02/04: Fix pipeline triggering: use a spark action instead of writing the dataset

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3cc76b86b00861737764c3233a44868e4a255e9c
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 11:09:28 2019 +0100

    Fix pipeline triggering: use a spark action instead of writing the dataset
---
 .../spark/structuredstreaming/translation/TranslationContext.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index acc49f4..e40bb85 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -167,7 +167,8 @@ public class TranslationContext {
         if (options.isStreaming()) {
           dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
         } else {
-          dataset.write();
+          // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
+          dataset.foreachPartition(t -> {});
         }
       }
     } catch (StreamingQueryException e) {