You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/08/05 02:33:40 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5455] Spark conf can not contain white space

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new e75b384  [ZEPPELIN-5455] Spark conf can not contain white space
e75b384 is described below

commit e75b38421706fe0f15aa36b7894bc06b8a8efea5
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Aug 3 14:32:20 2021 +0800

    [ZEPPELIN-5455] Spark conf can not contain white space
    
    ### What is this PR for?
    
    This PR would use `|` as parameter delimiter, although it can not resolve the issue fundamentally, because parameter may still contain `|`, but it would mitigate the issue a lot.
    
    ### What type of PR is it?
    [Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5455
    
    ### How should this be tested?
    * Manually tested
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/125258545-09aa7200-e331-11eb-8a56-5c68417ae7bc.png)
    
    ### Questions:
    * Does the licenses files need update? NO
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4173 from zjffdu/ZEPPELIN-5455 and squashes the following commits:
    
    37a399bad8 [Jeff Zhang] use StringJoiner
    cfd3494b25 [Jeff Zhang] [ZEPPELIN-5455] Spark conf can not contain white space
    
    (cherry picked from commit 9aa4b98c1e4081a5f1f9f13a19842ccb4cac165b)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 bin/interpreter.sh                                 |  4 +-
 .../zeppelin/integration/FlinkIntegrationTest.java |  3 +
 .../zeppelin/integration/SparkIntegrationTest.java |  3 +
 .../launcher/FlinkInterpreterLauncher.java         | 18 +++---
 .../launcher/SparkInterpreterLauncher.java         | 13 +++--
 .../launcher/SparkInterpreterLauncherTest.java     | 67 +++++++++++-----------
 6 files changed, 61 insertions(+), 47 deletions(-)

diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 49b2683..c1f284c 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -298,14 +298,14 @@ fi
 
 if [[ -n "${SPARK_SUBMIT}" ]]; then
   IFS=' ' read -r -a SPARK_SUBMIT_OPTIONS_ARRAY <<< "${SPARK_SUBMIT_OPTIONS}"
-  IFS=' ' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}"
+  IFS='|' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}"
   if [[ "${ZEPPELIN_SPARK_YARN_CLUSTER}" == "true"  ]]; then
     INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
   else
     INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
   fi
 elif [[ "${ZEPPELIN_FLINK_YARN_APPLICATION}" == "true" ]]; then
-  IFS=' ' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
+  IFS='|' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
   INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "yarn-application" "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
 else
   IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 86daefc..f27acbd 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -178,6 +178,8 @@ public abstract class FlinkIntegrationTest {
     flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
     flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
     flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn-application");
+    // parameters with whitespace
+    flinkInterpreterSetting.setProperty("flink.yarn.appName", "hello flink");
     flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false");
 
     testInterpreterBasics();
@@ -186,6 +188,7 @@ public abstract class FlinkIntegrationTest {
     GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
     GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
     assertEquals(1, response.getApplicationList().size());
+    assertEquals("hello flink", response.getApplicationList().get(0).getName());
 
     interpreterSettingManager.close();
   }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 5802780..40496c7 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -246,6 +246,8 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
     sparkInterpreterSetting.setProperty("spark.user.name", "#{user}");
     sparkInterpreterSetting.setProperty("zeppelin.spark.run.asLoginUser", "false");
+    // parameters with whitespace
+    sparkInterpreterSetting.setProperty("spark.app.name", "hello spark");
 
     try {
       setUpSparkInterpreterSetting(sparkInterpreterSetting);
@@ -255,6 +257,7 @@ public abstract class SparkIntegrationTest {
       GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
       GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
       assertEquals(1, response.getApplicationList().size());
+      assertEquals("hello spark", response.getApplicationList().get(0).getName());
 
     } finally {
       interpreterSettingManager.close();
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 995cdb6..3ff4f56 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
 public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
@@ -173,20 +174,20 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
 
     envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
 
-    StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+    StringJoiner flinkConfStringJoiner = new StringJoiner("|");
     // set yarn.ship-files
     List<String> yarnShipFiles = getYarnShipFiles(context);
     if (!yarnShipFiles.isEmpty()) {
-      flinkYarnApplicationConfBuilder.append(
-              " -D yarn.ship-files=" + yarnShipFiles.stream().collect(Collectors.joining(";")));
+      flinkConfStringJoiner.add("-D");
+      flinkConfStringJoiner.add("yarn.ship-files=" +
+              yarnShipFiles.stream().collect(Collectors.joining(";")));
     }
 
     // set yarn.application.name
     String yarnAppName = context.getProperties().getProperty("flink.yarn.appName");
     if (StringUtils.isNotBlank(yarnAppName)) {
-      // flink run command can not contains whitespace, so replace it with _
-      flinkYarnApplicationConfBuilder.append(
-              " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_") + "");
+      flinkConfStringJoiner.add("-D");
+      flinkConfStringJoiner.add("yarn.application.name=" + yarnAppName);
     }
 
     // add other yarn and python configuration.
@@ -199,11 +200,12 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
           LOGGER.warn("flink configuration key {} is skipped because it contains white space",
                   key);
         } else {
-          flinkYarnApplicationConfBuilder.append(" -D " + key + "=" + value);
+          flinkConfStringJoiner.add("-D");
+          flinkConfStringJoiner.add(key + "=" + value);
         }
       }
     }
-    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkYarnApplicationConfBuilder.toString());
+    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkConfStringJoiner.toString());
   }
 
   private List<String> getYarnShipFiles(InterpreterLaunchContext context) throws IOException {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index d0a9aa8..50bdc88 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -30,6 +30,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.StringJoiner;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -106,7 +107,6 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
               " to false if you want to use other modes.");
     }
 
-    StringBuilder sparkConfBuilder = new StringBuilder();
     if (isYarnMode() && getDeployMode().equals("cluster")) {
       if (sparkProperties.containsKey("spark.files")) {
         sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," +
@@ -173,22 +173,25 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
       }
     }
 
+    StringJoiner sparkConfSJ = new StringJoiner("|");
     if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) {
-      sparkConfBuilder.append(" --proxy-user " + context.getUserName());
+      sparkConfSJ.add("--proxy-user");
+      sparkConfSJ.add(context.getUserName());
       sparkProperties.remove("spark.yarn.keytab");
       sparkProperties.remove("spark.yarn.principal");
     }
 
     for (String name : sparkProperties.stringPropertyNames()) {
-      sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+      sparkConfSJ.add("--conf");
+      sparkConfSJ.add(name + "=" + sparkProperties.getProperty(name) + "");
     }
 
-    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfSJ.toString());
 
     // set these env in the order of
     // 1. interpreter-setting
     // 2. zeppelin-env.sh
-    // It is encouraged to set env in interpreter setting, but just for backward compatability,
+    // It is encouraged to set env in interpreter setting, but just for backward compatibility,
     // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
     for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"})  {
       String envValue = getEnv(envName);
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 5c997e1..f99bb21 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest {
       System.clearProperty(confVar.getVarName());
     }
 
-    sparkHome = DownloadUtils.downloadSpark("2.3.2", "2.7");
+    sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
             new File("..").getAbsolutePath());
 
@@ -107,8 +107,8 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getEnv().size() >= 2);
     assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
     assertFalse(interpreterProcess.getEnv().containsKey("ENV_1"));
-    assertEquals(" --conf spark.files=file_1" +
-                    " --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]",
+    assertEquals("--conf|spark.files=file_1" +
+                    "|--conf|spark.jars=jar_1|--conf|spark.app.name=intpGroupId|--conf|spark.master=local[*]",
             interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
@@ -138,9 +138,9 @@ public class SparkInterpreterLauncherTest {
     String sparkJars = "jar_1";
     String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
     String sparkFiles = "file_1";
-    assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
-                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
-                    " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client",
+    assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
+                    "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
+                    "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-client",
             interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
@@ -171,10 +171,10 @@ public class SparkInterpreterLauncherTest {
     String sparkJars = "jar_1";
     String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
     String sparkFiles = "file_1";
-    assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
-                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
-                    " --conf spark.submit.deployMode=client" +
-                    " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn",
+    assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
+                    "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
+                    "|--conf|spark.submit.deployMode=client" +
+                    "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn",
             interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
@@ -207,13 +207,14 @@ public class SparkInterpreterLauncherTest {
             zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
     String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
     String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
-    assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
-                    " --conf spark.yarn.maxAppAttempts=1" +
-                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
-                    " --conf spark.yarn.isPython=true" +
-                    " --conf spark.yarn.submit.waitAppCompletion=false" +
-                    " --conf spark.app.name=intpGroupId" +
-                    " --conf spark.master=yarn-cluster",
+    assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
+                    "|--conf|spark.yarn.maxAppAttempts=1" +
+                    "|--conf|spark.files=" + sparkFiles +
+                    "|--conf|spark.jars=" + sparkJars +
+                    "|--conf|spark.yarn.isPython=true" +
+                    "|--conf|spark.yarn.submit.waitAppCompletion=false" +
+                    "|--conf|spark.app.name=intpGroupId" +
+                    "|--conf|spark.master=yarn-cluster",
             interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
@@ -253,13 +254,13 @@ public class SparkInterpreterLauncherTest {
             zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
     String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
     String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
-    assertEquals(" --proxy-user user1 --conf spark.yarn.dist.archives=" + sparkrZip +
-            " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId" +
-            " --conf spark.yarn.maxAppAttempts=1" +
-            " --conf spark.master=yarn" +
-            " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
-            " --conf spark.submit.deployMode=cluster" +
-            " --conf spark.yarn.submit.waitAppCompletion=false",
+    assertEquals("--proxy-user|user1|--conf|spark.yarn.dist.archives=" + sparkrZip +
+            "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId" +
+            "|--conf|spark.yarn.maxAppAttempts=1" +
+            "|--conf|spark.master=yarn" +
+            "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
+            "|--conf|spark.submit.deployMode=cluster" +
+            "|--conf|spark.yarn.submit.waitAppCompletion=false",
             interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
     Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar"));
     FileUtils.deleteDirectory(localRepoPath.toFile());
@@ -301,14 +302,16 @@ public class SparkInterpreterLauncherTest {
     String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
     // escape special characters
     String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
-    assertEquals(" --proxy-user user1 --conf spark.yarn.dist.archives=" + sparkrZip +
-                    " --conf spark.yarn.isPython=true" +
-                    " --conf spark.app.name=intpGroupId" +
-                    " --conf spark.yarn.maxAppAttempts=1" +
-                    " --conf spark.master=yarn" +
-                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
-                    " --conf spark.submit.deployMode=cluster" +
-                    " --conf spark.yarn.submit.waitAppCompletion=false",
+    assertEquals("--proxy-user|user1" +
+                    "|--conf|spark.yarn.dist.archives=" + sparkrZip +
+                    "|--conf|spark.yarn.isPython=true" +
+                    "|--conf|spark.app.name=intpGroupId" +
+                    "|--conf|spark.yarn.maxAppAttempts=1" +
+                    "|--conf|spark.master=yarn" +
+                    "|--conf|spark.files=" + sparkFiles +
+                    "|--conf|spark.jars=" + sparkJars +
+                    "|--conf|spark.submit.deployMode=cluster" +
+                    "|--conf|spark.yarn.submit.waitAppCompletion=false",
             interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
     FileUtils.deleteDirectory(localRepoPath.toFile());
   }