You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/24 10:08:26 UTC

[beam] 02/37: Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 20d5bbd4e8d2d7d7b4dc9639d716b1e3403f91eb
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Fri Jul 19 15:48:32 2019 +0200

    Use "sparkMaster" in local mode to obtain number of shuffle partitions
    + spotless apply
---
 .../translation/TranslationContext.java                   | 15 +++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java           |  8 +++++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index f1bafd33..75f3ddf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -78,6 +78,21 @@ public class TranslationContext {
       sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
     }
 
+    // By default, Spark defines 200 as a number of sql partitions. This seems too much for local
+    // mode, so try to align with value of "sparkMaster" option in this case.
+    // We should not overwrite this value (or any user-defined spark configuration value) if the
+    // user has already configured it.
+    String sparkMaster = options.getSparkMaster();
+    if (sparkMaster != null
+        && sparkMaster.startsWith("local[")
+        && System.getProperty("spark.sql.shuffle.partitions") == null) {
+      int numPartitions =
+          Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1));
+      if (numPartitions > 0) {
+        sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions));
+      }
+    }
+
     this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
     this.serializablePipelineOptions = new SerializablePipelineOptions(options);
     this.datasets = new HashMap<>();
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 742c1b0..255adc8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -137,9 +137,11 @@ class ParDoTranslatorBatch<InputT, OutputT>
         pruneOutputFilteredByTag(context, allOutputs, output);
       }
     } else {
-      Dataset<WindowedValue<?>> outputDataset = allOutputs.map(
-          (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) value -> value._2,
-          EncoderHelpers.windowedValueEncoder());
+      Dataset<WindowedValue<?>> outputDataset =
+          allOutputs.map(
+              (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
+                  value -> value._2,
+              EncoderHelpers.windowedValueEncoder());
       context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset);
     }
   }