You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/08/05 02:33:40 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5455] Spark conf
can not contain white space
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new e75b384 [ZEPPELIN-5455] Spark conf can not contain white space
e75b384 is described below
commit e75b38421706fe0f15aa36b7894bc06b8a8efea5
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Aug 3 14:32:20 2021 +0800
[ZEPPELIN-5455] Spark conf can not contain white space
### What is this PR for?
This PR would use `|` as parameter delimiter, although it can not resolve the issue fundamentally, because parameter may still contain `|`, but it would mitigate the issue a lot.
### What type of PR is it?
[Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5455
### How should this be tested?
* Manually tested
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/125258545-09aa7200-e331-11eb-8a56-5c68417ae7bc.png)
### Questions:
* Does the licenses files need update? NO
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4173 from zjffdu/ZEPPELIN-5455 and squashes the following commits:
37a399bad8 [Jeff Zhang] use StringJoiner
cfd3494b25 [Jeff Zhang] [ZEPPELIN-5455] Spark conf can not contain white space
(cherry picked from commit 9aa4b98c1e4081a5f1f9f13a19842ccb4cac165b)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
bin/interpreter.sh | 4 +-
.../zeppelin/integration/FlinkIntegrationTest.java | 3 +
.../zeppelin/integration/SparkIntegrationTest.java | 3 +
.../launcher/FlinkInterpreterLauncher.java | 18 +++---
.../launcher/SparkInterpreterLauncher.java | 13 +++--
.../launcher/SparkInterpreterLauncherTest.java | 67 +++++++++++-----------
6 files changed, 61 insertions(+), 47 deletions(-)
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 49b2683..c1f284c 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -298,14 +298,14 @@ fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
IFS=' ' read -r -a SPARK_SUBMIT_OPTIONS_ARRAY <<< "${SPARK_SUBMIT_OPTIONS}"
- IFS=' ' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}"
+ IFS='|' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}"
if [[ "${ZEPPELIN_SPARK_YARN_CLUSTER}" == "true" ]]; then
INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
else
INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
fi
elif [[ "${ZEPPELIN_FLINK_YARN_APPLICATION}" == "true" ]]; then
- IFS=' ' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
+ IFS='|' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "yarn-application" "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
else
IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 86daefc..f27acbd 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -178,6 +178,8 @@ public abstract class FlinkIntegrationTest {
flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn-application");
+ // parameters with whitespace
+ flinkInterpreterSetting.setProperty("flink.yarn.appName", "hello flink");
flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false");
testInterpreterBasics();
@@ -186,6 +188,7 @@ public abstract class FlinkIntegrationTest {
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertEquals(1, response.getApplicationList().size());
+ assertEquals("hello flink", response.getApplicationList().get(0).getName());
interpreterSettingManager.close();
}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 5802780..40496c7 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -246,6 +246,8 @@ public abstract class SparkIntegrationTest {
sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
sparkInterpreterSetting.setProperty("spark.user.name", "#{user}");
sparkInterpreterSetting.setProperty("zeppelin.spark.run.asLoginUser", "false");
+ // parameters with whitespace
+ sparkInterpreterSetting.setProperty("spark.app.name", "hello spark");
try {
setUpSparkInterpreterSetting(sparkInterpreterSetting);
@@ -255,6 +257,7 @@ public abstract class SparkIntegrationTest {
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertEquals(1, response.getApplicationList().size());
+ assertEquals("hello spark", response.getApplicationList().get(0).getName());
} finally {
interpreterSettingManager.close();
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 995cdb6..3ff4f56 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.StringJoiner;
import java.util.stream.Collectors;
public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
@@ -173,20 +174,20 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
- StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+ StringJoiner flinkConfStringJoiner = new StringJoiner("|");
// set yarn.ship-files
List<String> yarnShipFiles = getYarnShipFiles(context);
if (!yarnShipFiles.isEmpty()) {
- flinkYarnApplicationConfBuilder.append(
- " -D yarn.ship-files=" + yarnShipFiles.stream().collect(Collectors.joining(";")));
+ flinkConfStringJoiner.add("-D");
+ flinkConfStringJoiner.add("yarn.ship-files=" +
+ yarnShipFiles.stream().collect(Collectors.joining(";")));
}
// set yarn.application.name
String yarnAppName = context.getProperties().getProperty("flink.yarn.appName");
if (StringUtils.isNotBlank(yarnAppName)) {
- // flink run command can not contains whitespace, so replace it with _
- flinkYarnApplicationConfBuilder.append(
- " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_") + "");
+ flinkConfStringJoiner.add("-D");
+ flinkConfStringJoiner.add("yarn.application.name=" + yarnAppName);
}
// add other yarn and python configuration.
@@ -199,11 +200,12 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
LOGGER.warn("flink configuration key {} is skipped because it contains white space",
key);
} else {
- flinkYarnApplicationConfBuilder.append(" -D " + key + "=" + value);
+ flinkConfStringJoiner.add("-D");
+ flinkConfStringJoiner.add(key + "=" + value);
}
}
}
- envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkYarnApplicationConfBuilder.toString());
+ envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkConfStringJoiner.toString());
}
private List<String> getYarnShipFiles(InterpreterLaunchContext context) throws IOException {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index d0a9aa8..50bdc88 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -30,6 +30,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.StringJoiner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -106,7 +107,6 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
" to false if you want to use other modes.");
}
- StringBuilder sparkConfBuilder = new StringBuilder();
if (isYarnMode() && getDeployMode().equals("cluster")) {
if (sparkProperties.containsKey("spark.files")) {
sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," +
@@ -173,22 +173,25 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
}
}
+ StringJoiner sparkConfSJ = new StringJoiner("|");
if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) {
- sparkConfBuilder.append(" --proxy-user " + context.getUserName());
+ sparkConfSJ.add("--proxy-user");
+ sparkConfSJ.add(context.getUserName());
sparkProperties.remove("spark.yarn.keytab");
sparkProperties.remove("spark.yarn.principal");
}
for (String name : sparkProperties.stringPropertyNames()) {
- sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+ sparkConfSJ.add("--conf");
+ sparkConfSJ.add(name + "=" + sparkProperties.getProperty(name) + "");
}
- env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfSJ.toString());
// set these env in the order of
// 1. interpreter-setting
// 2. zeppelin-env.sh
- // It is encouraged to set env in interpreter setting, but just for backward compatability,
+ // It is encouraged to set env in interpreter setting, but just for backward compatibility,
// we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
String envValue = getEnv(envName);
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 5c997e1..f99bb21 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest {
System.clearProperty(confVar.getVarName());
}
- sparkHome = DownloadUtils.downloadSpark("2.3.2", "2.7");
+ sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
new File("..").getAbsolutePath());
@@ -107,8 +107,8 @@ public class SparkInterpreterLauncherTest {
assertTrue(interpreterProcess.getEnv().size() >= 2);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
assertFalse(interpreterProcess.getEnv().containsKey("ENV_1"));
- assertEquals(" --conf spark.files=file_1" +
- " --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]",
+ assertEquals("--conf|spark.files=file_1" +
+ "|--conf|spark.jars=jar_1|--conf|spark.app.name=intpGroupId|--conf|spark.master=local[*]",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -138,9 +138,9 @@ public class SparkInterpreterLauncherTest {
String sparkJars = "jar_1";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1";
- assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
- " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
- " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client",
+ assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
+ "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
+ "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-client",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -171,10 +171,10 @@ public class SparkInterpreterLauncherTest {
String sparkJars = "jar_1";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1";
- assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
- " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
- " --conf spark.submit.deployMode=client" +
- " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn",
+ assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
+ "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
+ "|--conf|spark.submit.deployMode=client" +
+ "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -207,13 +207,14 @@ public class SparkInterpreterLauncherTest {
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
- assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
- " --conf spark.yarn.maxAppAttempts=1" +
- " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
- " --conf spark.yarn.isPython=true" +
- " --conf spark.yarn.submit.waitAppCompletion=false" +
- " --conf spark.app.name=intpGroupId" +
- " --conf spark.master=yarn-cluster",
+ assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
+ "|--conf|spark.yarn.maxAppAttempts=1" +
+ "|--conf|spark.files=" + sparkFiles +
+ "|--conf|spark.jars=" + sparkJars +
+ "|--conf|spark.yarn.isPython=true" +
+ "|--conf|spark.yarn.submit.waitAppCompletion=false" +
+ "|--conf|spark.app.name=intpGroupId" +
+ "|--conf|spark.master=yarn-cluster",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -253,13 +254,13 @@ public class SparkInterpreterLauncherTest {
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
- assertEquals(" --proxy-user user1 --conf spark.yarn.dist.archives=" + sparkrZip +
- " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId" +
- " --conf spark.yarn.maxAppAttempts=1" +
- " --conf spark.master=yarn" +
- " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
- " --conf spark.submit.deployMode=cluster" +
- " --conf spark.yarn.submit.waitAppCompletion=false",
+ assertEquals("--proxy-user|user1|--conf|spark.yarn.dist.archives=" + sparkrZip +
+ "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId" +
+ "|--conf|spark.yarn.maxAppAttempts=1" +
+ "|--conf|spark.master=yarn" +
+ "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
+ "|--conf|spark.submit.deployMode=cluster" +
+ "|--conf|spark.yarn.submit.waitAppCompletion=false",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar"));
FileUtils.deleteDirectory(localRepoPath.toFile());
@@ -301,14 +302,16 @@ public class SparkInterpreterLauncherTest {
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
// escape special characters
String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
- assertEquals(" --proxy-user user1 --conf spark.yarn.dist.archives=" + sparkrZip +
- " --conf spark.yarn.isPython=true" +
- " --conf spark.app.name=intpGroupId" +
- " --conf spark.yarn.maxAppAttempts=1" +
- " --conf spark.master=yarn" +
- " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
- " --conf spark.submit.deployMode=cluster" +
- " --conf spark.yarn.submit.waitAppCompletion=false",
+ assertEquals("--proxy-user|user1" +
+ "|--conf|spark.yarn.dist.archives=" + sparkrZip +
+ "|--conf|spark.yarn.isPython=true" +
+ "|--conf|spark.app.name=intpGroupId" +
+ "|--conf|spark.yarn.maxAppAttempts=1" +
+ "|--conf|spark.master=yarn" +
+ "|--conf|spark.files=" + sparkFiles +
+ "|--conf|spark.jars=" + sparkJars +
+ "|--conf|spark.submit.deployMode=cluster" +
+ "|--conf|spark.yarn.submit.waitAppCompletion=false",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
FileUtils.deleteDirectory(localRepoPath.toFile());
}