You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/26 04:55:01 UTC
[doris] branch branch-1.1-lts updated: [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new 17a8324cd1 [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672
17a8324cd1 is described below
commit 17a8324cd1bb65a40232261722f42ae07c1cd951
Author: chenlinzhong <49...@qq.com>
AuthorDate: Wed Oct 26 12:54:55 2022 +0800
[enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672
---
.../org/apache/doris/catalog/SparkResource.java | 49 ++++++++++++++++++++--
.../doris/load/loadv2/SparkEtlJobHandler.java | 32 ++++++++++++--
.../apache/doris/load/loadv2/SparkLoadJobTest.java | 2 +-
3 files changed, 76 insertions(+), 7 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index e2feb65326..00bc2d7c69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -75,6 +75,7 @@ public class SparkResource extends Resource {
private static final String YARN_MASTER = "yarn";
private static final String SPARK_CONFIG_PREFIX = "spark.";
private static final String BROKER_PROPERTY_PREFIX = "broker.";
+ private static final String ENV_PREFIX = "env.";
// spark uses hadoop configs in the form of spark.hadoop.*
private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop.";
private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
@@ -104,19 +105,22 @@ public class SparkResource extends Resource {
// broker username and password
@SerializedName(value = "brokerProperties")
private Map<String, String> brokerProperties;
+ @SerializedName(value = "envConfigs")
+ private Map<String, String> envConfigs;
public SparkResource(String name) {
- this(name, Maps.newHashMap(), null, null, Maps.newHashMap());
+ this(name, Maps.newHashMap(), null, null, Maps.newHashMap(), Maps.newHashMap());
}
// "public" for testing
public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker,
- Map<String, String> brokerProperties) {
+ Map<String, String> brokerProperties, Map<String, String> envConfigs) {
super(name, ResourceType.SPARK);
this.sparkConfigs = sparkConfigs;
this.workingDir = workingDir;
this.broker = broker;
this.brokerProperties = brokerProperties;
+ this.envConfigs = envConfigs;
}
public String getMaster() {
@@ -150,12 +154,25 @@ public class SparkResource extends Resource {
return sparkConfigs;
}
+ public Map<String, String> getEnvConfigsWithoutPrefix() {
+ Map<String, String> envConfig = Maps.newHashMap();
+ if (envConfigs != null) {
+ for (Map.Entry<String, String> entry : envConfigs.entrySet()) {
+ if (entry.getKey().startsWith(ENV_PREFIX)) {
+ String key = entry.getKey().substring(ENV_PREFIX.length());
+ envConfig.put(key, entry.getValue());
+ }
+ }
+ }
+ return envConfig;
+ }
+
public Pair<String, String> getYarnResourcemanagerAddressPair() {
return Pair.create(YARN_RESOURCE_MANAGER_ADDRESS, sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_ADDRESS));
}
public SparkResource getCopiedResource() {
- return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties);
+ return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties, envConfigs);
}
@Override
@@ -230,6 +247,15 @@ public class SparkResource extends Resource {
broker = properties.get(BROKER);
}
brokerProperties.putAll(getBrokerProperties(properties));
+ Map<String, String> env = getEnvConfig(properties);
+ if (env.size() > 0) {
+ if (envConfigs == null) {
+ envConfigs = env;
+ } else {
+ envConfigs.putAll(env);
+ }
+ }
+ LOG.info("updateProperties,{},{}", properties, envConfigs);
}
@Override
@@ -238,6 +264,8 @@ public class SparkResource extends Resource {
// get spark configs
sparkConfigs = getSparkConfig(properties);
+ envConfigs = getEnvConfig(properties);
+ LOG.info("setProperties,{},{}", properties, envConfigs);
// check master and deploy mode
if (getMaster() == null) {
throw new DdlException("Missing " + SPARK_MASTER + " in properties");
@@ -281,6 +309,16 @@ public class SparkResource extends Resource {
return sparkConfig;
}
+ private Map<String, String> getEnvConfig(Map<String, String> properties) {
+ Map<String, String> envConfig = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey().startsWith(ENV_PREFIX)) {
+ envConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return envConfig;
+ }
+
private Map<String, String> getSparkHadoopConfig(Map<String, String> properties) {
Map<String, String> sparkConfig = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -333,5 +371,10 @@ public class SparkResource extends Resource {
for (Map.Entry<String, String> entry : brokerProperties.entrySet()) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
+ if (envConfigs != null) {
+ for (Map.Entry<String, String> entry : envConfigs.entrySet()) {
+ result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
+ }
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index f26793ab82..b89817890e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -123,7 +123,10 @@ public class SparkEtlJobHandler {
throw new LoadException(e.getMessage());
}
- SparkLauncher launcher = new SparkLauncher();
+ Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
+ LOG.info("submit etl job,env:{}", envParams);
+
+ SparkLauncher launcher = new SparkLauncher(envParams);
// master | deployMode
// ------------|-------------
// yarn | cluster
@@ -195,7 +198,19 @@ public class SparkEtlJobHandler {
// command: yarn --config configDir application -status appId
String yarnStatusCmd = String.format(YARN_STATUS_CMD, yarnClient, configDir, appId);
LOG.info(yarnStatusCmd);
- String[] envp = { "LC_ALL=" + Config.locale };
+
+ Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
+ int envNums = envParams.size() + 1;
+ String[] envp = new String[envNums];
+ int idx = 0;
+ envp[idx++] = "LC_ALL=" + Config.locale;
+ if (envParams.size() > 0) {
+ for (Map.Entry<String, String> entry : envParams.entrySet()) {
+ String envItem = entry.getKey() + "=" + entry.getValue();
+ envp[idx++] = envItem;
+ }
+ }
+ LOG.info("getEtlJobStatus,appId:{}, loadJobId:{}, env:{},resource:{}", appId, loadJobId, envp, resource);
CommandResult result = Util.executeCommand(yarnStatusCmd, envp, EXEC_CMD_TIMEOUT_MS);
if (result.getReturnCode() != 0) {
String stderr = result.getStderr();
@@ -283,7 +298,18 @@ public class SparkEtlJobHandler {
// command: yarn --config configDir application -kill appId
String yarnKillCmd = String.format(YARN_KILL_CMD, yarnClient, configDir, appId);
LOG.info(yarnKillCmd);
- String[] envp = { "LC_ALL=" + Config.locale };
+ Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
+ int envNums = envParams.size() + 1;
+ String[] envp = new String[envNums];
+ int idx = 0;
+ envp[idx++] = "LC_ALL=" + Config.locale;
+ if (envParams.size() > 0) {
+ for (Map.Entry<String, String> entry : envParams.entrySet()) {
+ String envItem = entry.getKey() + "=" + entry.getValue();
+ envp[idx++] = envItem;
+ }
+ }
+ LOG.info("killEtlJob, env:{}", envp);
CommandResult result = Util.executeCommand(yarnKillCmd, envp, EXEC_CMD_TIMEOUT_MS);
LOG.info("yarn application -kill {}, output: {}", appId, result.getStdout());
if (result.getReturnCode() != 0) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index 9908f9b804..b220dfdeb6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -493,7 +493,7 @@ public class SparkLoadJobTest {
@Mocked ResourceMgr resourceMgr) throws Exception {
long dbId = 1000L;
SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos",
- Maps.newHashMap());
+ Maps.newHashMap(), Maps.newHashMap());
new Expectations() {
{
catalog.getResourceMgr();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org