You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/26 04:55:01 UTC

[doris] branch branch-1.1-lts updated: [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 17a8324cd1 [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672
17a8324cd1 is described below

commit 17a8324cd1bb65a40232261722f42ae07c1cd951
Author: chenlinzhong <49...@qq.com>
AuthorDate: Wed Oct 26 12:54:55 2022 +0800

    [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672
---
 .../org/apache/doris/catalog/SparkResource.java    | 49 ++++++++++++++++++++--
 .../doris/load/loadv2/SparkEtlJobHandler.java      | 32 ++++++++++++--
 .../apache/doris/load/loadv2/SparkLoadJobTest.java |  2 +-
 3 files changed, 76 insertions(+), 7 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index e2feb65326..00bc2d7c69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -75,6 +75,7 @@ public class SparkResource extends Resource {
     private static final String YARN_MASTER = "yarn";
     private static final String SPARK_CONFIG_PREFIX = "spark.";
     private static final String BROKER_PROPERTY_PREFIX = "broker.";
+    private static final String ENV_PREFIX = "env.";
     // spark uses hadoop configs in the form of spark.hadoop.*
     private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop.";
     private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
@@ -104,19 +105,22 @@ public class SparkResource extends Resource {
     // broker username and password
     @SerializedName(value = "brokerProperties")
     private Map<String, String> brokerProperties;
+    @SerializedName(value = "envConfigs")
+    private Map<String, String> envConfigs;
 
     public SparkResource(String name) {
-        this(name, Maps.newHashMap(), null, null, Maps.newHashMap());
+        this(name, Maps.newHashMap(), null, null, Maps.newHashMap(), Maps.newHashMap());
     }
 
     // "public" for testing
     public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker,
-                         Map<String, String> brokerProperties) {
+                         Map<String, String> brokerProperties, Map<String, String> envConfigs) {
         super(name, ResourceType.SPARK);
         this.sparkConfigs = sparkConfigs;
         this.workingDir = workingDir;
         this.broker = broker;
         this.brokerProperties = brokerProperties;
+        this.envConfigs = envConfigs;
     }
 
     public String getMaster() {
@@ -150,12 +154,25 @@ public class SparkResource extends Resource {
         return sparkConfigs;
     }
 
+    public Map<String, String> getEnvConfigsWithoutPrefix() {
+        Map<String, String> envConfig = Maps.newHashMap();
+        if (envConfigs != null) {
+            for (Map.Entry<String, String> entry : envConfigs.entrySet()) {
+                if (entry.getKey().startsWith(ENV_PREFIX)) {
+                    String key = entry.getKey().substring(ENV_PREFIX.length());
+                    envConfig.put(key, entry.getValue());
+                }
+            }
+        }
+        return envConfig;
+    }
+
     public Pair<String, String> getYarnResourcemanagerAddressPair() {
         return Pair.create(YARN_RESOURCE_MANAGER_ADDRESS, sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_ADDRESS));
     }
 
     public SparkResource getCopiedResource() {
-        return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties);
+        return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties, envConfigs);
     }
 
     @Override
@@ -230,6 +247,15 @@ public class SparkResource extends Resource {
             broker = properties.get(BROKER);
         }
         brokerProperties.putAll(getBrokerProperties(properties));
+        Map<String, String> env = getEnvConfig(properties);
+        if (env.size() > 0) {
+            if (envConfigs == null) {
+                envConfigs = env;
+            } else {
+                envConfigs.putAll(env);
+            }
+        }
+        LOG.info("updateProperties,{},{}", properties, envConfigs);
     }
 
     @Override
@@ -238,6 +264,8 @@ public class SparkResource extends Resource {
 
         // get spark configs
         sparkConfigs = getSparkConfig(properties);
+        envConfigs = getEnvConfig(properties);
+        LOG.info("setProperties,{},{}", properties, envConfigs);
         // check master and deploy mode
         if (getMaster() == null) {
             throw new DdlException("Missing " + SPARK_MASTER + " in properties");
@@ -281,6 +309,16 @@ public class SparkResource extends Resource {
         return sparkConfig;
     }
 
+    private Map<String, String> getEnvConfig(Map<String, String> properties) {
+        Map<String, String> envConfig = Maps.newHashMap();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (entry.getKey().startsWith(ENV_PREFIX)) {
+                envConfig.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return envConfig;
+    }
+
     private Map<String, String> getSparkHadoopConfig(Map<String, String> properties) {
         Map<String, String> sparkConfig = Maps.newHashMap();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -333,5 +371,10 @@ public class SparkResource extends Resource {
         for (Map.Entry<String, String> entry : brokerProperties.entrySet()) {
             result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
         }
+        if (envConfigs != null) {
+            for (Map.Entry<String, String> entry : envConfigs.entrySet()) {
+                result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
+            }
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index f26793ab82..b89817890e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -123,7 +123,10 @@ public class SparkEtlJobHandler {
             throw new LoadException(e.getMessage());
         }
 
-        SparkLauncher launcher = new SparkLauncher();
+        Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
+        LOG.info("submit etl job,env:{}", envParams);
+
+        SparkLauncher launcher = new SparkLauncher(envParams);
         // master      |  deployMode
         // ------------|-------------
         // yarn        |  cluster
@@ -195,7 +198,19 @@ public class SparkEtlJobHandler {
             // command: yarn --config configDir application -status appId
             String yarnStatusCmd = String.format(YARN_STATUS_CMD, yarnClient, configDir, appId);
             LOG.info(yarnStatusCmd);
-            String[] envp = { "LC_ALL=" + Config.locale };
+
+            Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
+            int envNums = envParams.size() + 1;
+            String[] envp = new String[envNums];
+            int idx = 0;
+            envp[idx++] = "LC_ALL=" + Config.locale;
+            if (envParams.size() > 0) {
+                for (Map.Entry<String, String> entry : envParams.entrySet()) {
+                    String envItem = entry.getKey() + "=" + entry.getValue();
+                    envp[idx++] = envItem;
+                }
+            }
+            LOG.info("getEtlJobStatus,appId:{}, loadJobId:{}, env:{},resource:{}", appId, loadJobId, envp, resource);
             CommandResult result = Util.executeCommand(yarnStatusCmd, envp, EXEC_CMD_TIMEOUT_MS);
             if (result.getReturnCode() != 0) {
                 String stderr = result.getStderr();
@@ -283,7 +298,18 @@ public class SparkEtlJobHandler {
             // command: yarn --config configDir application -kill appId
             String yarnKillCmd = String.format(YARN_KILL_CMD, yarnClient, configDir, appId);
             LOG.info(yarnKillCmd);
-            String[] envp = { "LC_ALL=" + Config.locale };
+            Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
+            int envNums = envParams.size() + 1;
+            String[] envp = new String[envNums];
+            int idx = 0;
+            envp[idx++] = "LC_ALL=" + Config.locale;
+            if (envParams.size() > 0) {
+                for (Map.Entry<String, String> entry : envParams.entrySet()) {
+                    String envItem = entry.getKey() + "=" + entry.getValue();
+                    envp[idx++] = envItem;
+                }
+            }
+            LOG.info("killEtlJob, env:{}", envp);
             CommandResult result = Util.executeCommand(yarnKillCmd, envp, EXEC_CMD_TIMEOUT_MS);
             LOG.info("yarn application -kill {}, output: {}", appId, result.getStdout());
             if (result.getReturnCode() != 0) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index 9908f9b804..b220dfdeb6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -493,7 +493,7 @@ public class SparkLoadJobTest {
                                         @Mocked ResourceMgr resourceMgr) throws Exception {
         long dbId = 1000L;
         SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos",
-                Maps.newHashMap());
+                Maps.newHashMap(), Maps.newHashMap());
         new Expectations() {
             {
                 catalog.getResourceMgr();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org