You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/07 00:58:17 UTC
[doris] 07/07: [Enhancement](spark load)Support for RM HA (#15000)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 50f4a7356426fa2bf9d51d48fca9c4bb7c2ab450
Author: liujinhui <96...@qq.com>
AuthorDate: Tue Mar 7 15:46:14 2023 +0800
[Enhancement](spark load)Support for RM HA (#15000)
Adding RM HA configuration to the spark load.
Spark can accept HA parameters via config, we just need to accept it in the DDL
CREATE EXTERNAL RESOURCE spark_resource_sinan_node_manager_ha
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.executor.memory" = "10g",
"spark.yarn.queue" = "XXXX",
"spark.hadoop.yarn.resourcemanager.address" = "XXXX:8032",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "XXXX",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "XXXX",
"spark.hadoop.fs.defaultFS" = "hdfs://XXXX",
"spark.hadoop.dfs.nameservices" = "hacluster",
"spark.hadoop.dfs.ha.namenodes.hacluster" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode1" = "XXX:8020",
"spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode2" = "XXXX:8020",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://XXXX/doris_prd_data/sinan/spark_load/",
"broker" = "broker_personas",
"broker.username" = "hdfs",
"broker.password" = "",
"broker.dfs.nameservices" = "XXX",
"broker.dfs.ha.namenodes.XXX" = "mynamenode1, mynamenode2",
"broker.dfs.namenode.rpc-address.XXXX.mynamenode1" = "XXXX:8020",
"broker.dfs.namenode.rpc-address.XXXX.mynamenode2" = "XXXX:8020",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
Co-authored-by: liujh <li...@t3go.cn>
---
.../org/apache/doris/catalog/SparkResource.java | 35 ++++++++++++++++++----
.../apache/doris/catalog/SparkResourceTest.java | 16 ++++++++++
2 files changed, 46 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index ac761b5fb6..98ebe54b96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -80,6 +81,10 @@ public class SparkResource extends Resource {
private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS";
private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
+ private static final String SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED = "spark.hadoop.yarn.resourcemanager.ha.enabled";
+ private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids";
+ private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s";
+ private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s";
public enum DeployMode {
CLUSTER,
@@ -283,11 +288,31 @@ public class SparkResource extends Resource {
throw new DdlException("Missing " + SPARK_SUBMIT_DEPLOY_MODE + " in properties");
}
// if deploy machines do not set HADOOP_CONF_DIR env, we should set these configs blow
- if ((!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)
- || !sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS))
- && isYarnMaster()) {
- throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + " and " + SPARK_FS_DEFAULT_FS
- + ") in yarn master");
+ if (isYarnMaster()) {
+ if (!sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) {
+ throw new DdlException("Missing (" + SPARK_FS_DEFAULT_FS + ") in yarn master");
+ }
+
+ String haEnabled = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED);
+ if (StringUtils.isNotEmpty(haEnabled) && "true".equals(haEnabled)) {
+ if (StringUtils.isEmpty(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS)) {
+ throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS + ") in yarn master, "
+ + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
+ }
+
+ String[] haIds = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS).split(",");
+ for (String haId : haIds) {
+ String addressKey = String.format(YARN_RESOURCE_MANAGER_ADDRESS_FOMART, haId);
+ String hostnameKey = String.format(YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT, haId);
+ if (!sparkConfigs.containsKey(addressKey) && !sparkConfigs.containsKey(hostnameKey)) {
+ throw new DdlException("Missing " + addressKey + " or " + hostnameKey + " in yarn master, "
+ + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
+ }
+ }
+ } else if (!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)) {
+ throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + ") in yarn master, "
+ + "or not turned on ha.");
+ }
}
// check working dir and broker
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
index dc2436c434..0291e122bc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
@@ -117,6 +117,22 @@ public class SparkResourceTest {
BaseProcResult result = new BaseProcResult();
resource.getProcNodeData(result);
Assert.assertEquals(9, result.getRows().size());
+
+ properties.clear();
+ properties.put("type", type);
+ properties.put("spark.master", "yarn");
+ properties.put("spark.submit.deployMode", "cluster");
+ properties.put("spark.hadoop.yarn.resourcemanager.ha.enabled", "true");
+ properties.put("spark.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
+ properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm1", "host1");
+ properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm2", "host2");
+ properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000");
+ stmt = new CreateResourceStmt(true, false, name, properties);
+ stmt.analyze(analyzer);
+ resource = (SparkResource) Resource.fromStmt(stmt);
+ Assert.assertTrue(resource.isYarnMaster());
+ map = resource.getSparkConfigs();
+ Assert.assertEquals(7, map.size());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org