You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/07 00:58:17 UTC

[doris] 07/07: [Enhancement](spark load)Support for RM HA (#15000)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 50f4a7356426fa2bf9d51d48fca9c4bb7c2ab450
Author: liujinhui <96...@qq.com>
AuthorDate: Tue Mar 7 15:46:14 2023 +0800

    [Enhancement](spark load)Support for RM HA  (#15000)
    
    Adding RM HA configuration to the spark load.
    Spark can accept HA parameters via config, we just need to accept it in the DDL
    
    CREATE EXTERNAL RESOURCE spark_resource_sinan_node_manager_ha
    PROPERTIES
    (
    "type" = "spark",
    "spark.master" = "yarn",
    "spark.submit.deployMode" = "cluster",
    "spark.executor.memory" = "10g",
    "spark.yarn.queue" = "XXXX",
    "spark.hadoop.yarn.resourcemanager.address" = "XXXX:8032",
    "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
    "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
    "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "XXXX",
    "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "XXXX",
    "spark.hadoop.fs.defaultFS" = "hdfs://XXXX",
    "spark.hadoop.dfs.nameservices" = "hacluster",
    "spark.hadoop.dfs.ha.namenodes.hacluster" = "mynamenode1,mynamenode2",
    "spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode1" = "XXX:8020",
    "spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode2" = "XXXX:8020",
    "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "working_dir" = "hdfs://XXXX/doris_prd_data/sinan/spark_load/",
    "broker" = "broker_personas",
    "broker.username" = "hdfs",
    "broker.password" = "",
    "broker.dfs.nameservices" = "XXX",
    "broker.dfs.ha.namenodes.XXX" = "mynamenode1, mynamenode2",
    "broker.dfs.namenode.rpc-address.XXXX.mynamenode1" = "XXXX:8020",
    "broker.dfs.namenode.rpc-address.XXXX.mynamenode2" = "XXXX:8020",
    "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
    
    Co-authored-by: liujh <li...@t3go.cn>
---
 .../org/apache/doris/catalog/SparkResource.java    | 35 ++++++++++++++++++----
 .../apache/doris/catalog/SparkResourceTest.java    | 16 ++++++++++
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index ac761b5fb6..98ebe54b96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -80,6 +81,10 @@ public class SparkResource extends Resource {
     private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
     private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS";
     private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
+    private static final String SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED = "spark.hadoop.yarn.resourcemanager.ha.enabled";
+    private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids";
+    private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s";
+    private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s";
 
     public enum DeployMode {
         CLUSTER,
@@ -283,11 +288,31 @@ public class SparkResource extends Resource {
             throw new DdlException("Missing " + SPARK_SUBMIT_DEPLOY_MODE + " in properties");
         }
         // if deploy machines do not set HADOOP_CONF_DIR env, we should set these configs blow
-        if ((!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)
-                || !sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS))
-                && isYarnMaster()) {
-            throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + " and " + SPARK_FS_DEFAULT_FS
-                                           + ") in yarn master");
+        if (isYarnMaster()) {
+            if (!sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) {
+                throw new DdlException("Missing (" + SPARK_FS_DEFAULT_FS + ") in yarn master");
+            }
+
+            String haEnabled = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED);
+            if (StringUtils.isNotEmpty(haEnabled) && "true".equals(haEnabled)) {
+                if (StringUtils.isEmpty(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS)) {
+                    throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS + ") in yarn master, "
+                        + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
+                }
+
+                String[] haIds = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS).split(",");
+                for (String haId : haIds) {
+                    String addressKey = String.format(YARN_RESOURCE_MANAGER_ADDRESS_FOMART, haId);
+                    String hostnameKey = String.format(YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT, haId);
+                    if (!sparkConfigs.containsKey(addressKey) && !sparkConfigs.containsKey(hostnameKey)) {
+                        throw new DdlException("Missing " + addressKey + " or " + hostnameKey + " in yarn master, "
+                            + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
+                    }
+                }
+            } else if (!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)) {
+                throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + ") in yarn master, "
+                    + "or not turned on ha.");
+            }
         }
 
         // check working dir and broker
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
index dc2436c434..0291e122bc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
@@ -117,6 +117,22 @@ public class SparkResourceTest {
         BaseProcResult result = new BaseProcResult();
         resource.getProcNodeData(result);
         Assert.assertEquals(9, result.getRows().size());
+
+        properties.clear();
+        properties.put("type", type);
+        properties.put("spark.master", "yarn");
+        properties.put("spark.submit.deployMode", "cluster");
+        properties.put("spark.hadoop.yarn.resourcemanager.ha.enabled", "true");
+        properties.put("spark.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
+        properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm1", "host1");
+        properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm2", "host2");
+        properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000");
+        stmt = new CreateResourceStmt(true, false, name, properties);
+        stmt.analyze(analyzer);
+        resource = (SparkResource) Resource.fromStmt(stmt);
+        Assert.assertTrue(resource.isYarnMaster());
+        map = resource.getSparkConfigs();
+        Assert.assertEquals(7, map.size());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org