You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/12 12:20:37 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24862: [Spark Dataset runner] Fix SparkSessionFactory to better support running on a cluster.

aromanenko-dev commented on code in PR #24862:
URL: https://github.com/apache/beam/pull/24862#discussion_r1068056400


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java:
##########
@@ -130,19 +170,35 @@ private static SparkSession.Builder sessionBuilder(
     // mode, so try to align with value of "sparkMaster" option in this case.
     // We should not overwrite this value (or any user-defined spark configuration value) if the
     // user has already configured it.
-    if (master != null
-        && !master.equals("local[*]")
-        && master.startsWith("local[")
-        && System.getProperty("spark.sql.shuffle.partitions") == null) {
-      int numPartitions =
-          Integer.parseInt(master.substring("local[".length(), master.length() - 1));
-      if (numPartitions > 0) {
-        sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions));
-      }
+    int partitions = localNumPartitions(master);
+    if (partitions > 0) {
+      sparkConf.setIfMissing("spark.sql.shuffle.partitions", Integer.toString(partitions));
     }
+
     return SparkSession.builder().config(sparkConf);
   }
 
+  @SuppressWarnings({"return", "toarray.nullable.elements", "methodref.receiver"}) // safe to ignore
+  private static String[] filesToStage(
+      SparkStructuredStreamingPipelineOptions opts, Collection<String> excludes) {
+    Collection<String> files = opts.getFilesToStage();
+    if (files == null || files.isEmpty()) {
+      return EMPTY_STRING_ARRAY;
+    }
+    if (!excludes.isEmpty()) {
+      files = Collections2.filter(files, f -> !excludes.stream().anyMatch(f::contains));
+    }
+    return files.toArray(EMPTY_STRING_ARRAY);
+  }
+
+  private static String[] sparkJars(SparkConf conf) {

Review Comment:
   nit: I'd add a verb to the current name, like `retrieveSparkJars()` or similar, since it's a method, not variable nor constant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org