You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/17 03:25:13 UTC
[linkis] branch dev-1.4.0 updated: feat: support multiple cluster PART-1 #4179 (#4460)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new f1ed1dd6f feat: support multiple cluster PART-1 #4179 (#4460)
f1ed1dd6f is described below
commit f1ed1dd6fd9c6fa22e0f39f5984e311d68405ff7
Author: Cheng'hui Chen <27...@users.noreply.github.com>
AuthorDate: Wed May 17 11:25:06 2023 +0800
feat: support multiple cluster PART-1 #4179 (#4460)
---
.../apache/linkis/common/conf/Configuration.scala | 2 +
.../linkis/hadoop/common/conf/HadoopConf.scala | 12 ++
.../common/entity/HDFSFileSystemContainer.scala | 4 +-
.../linkis/hadoop/common/utils/HDFSUtils.scala | 160 +++++++++++++++++----
.../linkis/hadoop/common/conf/HDFSUtilsTest.scala | 55 +++++++
.../linkis/storage/fs/impl/HDFSFileSystem.java | 21 +--
.../linkis/storage/fs/impl/OSSFileSystem.java | 5 +
.../linkis/storage/utils/StorageConfiguration.java | 3 +
.../linkis/cli/application/constants/AppKeys.java | 2 +
.../cli/application/constants/LinkisKeys.java | 1 +
.../command/template/UniversalCmdTemplate.java | 9 ++
.../linkis/ecm/server/conf/ECMConfiguration.scala | 13 ++
.../service/impl/DefaultECMRegisterService.scala | 7 +
.../entrance/parser/CommonEntranceParser.scala | 21 +++
.../rm/external/yarn/YarnResourceRequester.java | 7 +-
.../manager/rm/utils/RequestKerberosUrlUtils.java | 6 -
.../manager/label/entity/cluster/ClusterLabel.java | 17 ++-
.../conf/linkis-cg-engineconnmanager.properties | 2 +
linkis-dist/package/conf/linkis.properties | 1 +
.../metadata/query/service/HiveConnection.java | 10 --
20 files changed, 303 insertions(+), 55 deletions(-)
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
index 8ac94739c..9443f1526 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
@@ -33,6 +33,8 @@ object Configuration extends Logging {
val IS_PROMETHEUS_ENABLE = CommonVars("wds.linkis.prometheus.enable", false)
+ val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false)
+
val PROMETHEUS_ENDPOINT = CommonVars("wds.linkis.prometheus.endpoint", "/actuator/prometheus")
val LINKIS_HOME = CommonVars("wds.linkis.home", CommonVars("LINKIS_HOME", "/tmp").getValue)
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index fc7f91504..b3e5cf202 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -25,16 +25,28 @@ object HadoopConf {
val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
+ val KERBEROS_ENABLE_MAP =
+ CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
+
val KEYTAB_FILE = CommonVars("wds.linkis.keytab.file", "/appcom/keytab/")
+ val EXTERNAL_KEYTAB_FILE_PREFIX =
+ CommonVars("linkis.external.keytab.file.prefix", "/appcom/config/external-conf/keytab")
+
val KEYTAB_HOST = CommonVars("wds.linkis.keytab.host", "127.0.0.1")
+ val KEYTAB_HOST_MAP =
+ CommonVars("linkis.keytab.host.map", "cluster1=127.0.0.2,cluster2=127.0.0.3")
+
val KEYTAB_HOST_ENABLED = CommonVars("wds.linkis.keytab.host.enabled", false)
val KEYTAB_PROXYUSER_ENABLED = CommonVars("wds.linkis.keytab.proxyuser.enable", false)
val KEYTAB_PROXYUSER_SUPERUSER = CommonVars("wds.linkis.keytab.proxyuser.superuser", "hadoop")
+ val KEYTAB_PROXYUSER_SUPERUSER_MAP =
+ CommonVars("linkis.keytab.proxyuser.superuser.map", "cluster1=hadoop1,cluster2=hadoop2")
+
val hadoopConfDir =
CommonVars("hadoop.config.dir", CommonVars("HADOOP_CONF_DIR", "").getValue).getValue
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
index dfbc5c934..6b4eaaece 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
@@ -21,7 +21,7 @@ import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.hadoop.fs.FileSystem
-class HDFSFileSystemContainer(fs: FileSystem, user: String) {
+class HDFSFileSystemContainer(fs: FileSystem, user: String, label: String) {
private var lastAccessTime: Long = System.currentTimeMillis()
@@ -31,6 +31,8 @@ class HDFSFileSystemContainer(fs: FileSystem, user: String) {
def getUser: String = this.user
+ def getLabel: String = this.label
+
def getLastAccessTime: Long = this.lastAccessTime
def updateLastAccessTime: Unit = {
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index 922b5f6a8..f2a615e99 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -41,6 +41,8 @@ object HDFSUtils extends Logging {
new java.util.HashMap[String, HDFSFileSystemContainer]()
private val LOCKER_SUFFIX = "_HDFS"
+ private val DEFAULT_CACHE_LABEL = "default"
+ private val JOINT = "_"
if (HadoopConf.HDFS_ENABLE_CACHE) {
logger.info("HDFS Cache enabled ")
@@ -56,10 +58,13 @@ object HDFSUtils extends Logging {
)
}
.foreach { hdfsFileSystemContainer =>
- val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
+ val locker =
+ hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel + LOCKER_SUFFIX
locker.intern() synchronized {
if (hdfsFileSystemContainer.canRemove()) {
- fileSystemCache.remove(hdfsFileSystemContainer.getUser)
+ fileSystemCache.remove(
+ hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel
+ )
IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem)
logger.info(
s"user${hdfsFileSystemContainer.getUser} to remove hdfsFileSystemContainer,because hdfsFileSystemContainer can remove"
@@ -124,15 +129,27 @@ object HDFSUtils extends Logging {
def getHDFSUserFileSystem(
userName: String,
conf: org.apache.hadoop.conf.Configuration
+ ): FileSystem = getHDFSUserFileSystem(userName, null, conf)
+
+ def getHDFSUserFileSystem(
+ userName: String,
+ label: String,
+ conf: org.apache.hadoop.conf.Configuration
): FileSystem = if (HadoopConf.HDFS_ENABLE_CACHE) {
- val locker = userName + LOCKER_SUFFIX
+ val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+ val cacheKey = userName + JOINT + cacheLabel
+ val locker = cacheKey + LOCKER_SUFFIX
locker.intern().synchronized {
- val hdfsFileSystemContainer = if (fileSystemCache.containsKey(userName)) {
- fileSystemCache.get(userName)
+ val hdfsFileSystemContainer = if (fileSystemCache.containsKey(cacheKey)) {
+ fileSystemCache.get(cacheKey)
} else {
+ // we use cacheLabel to create HDFSFileSystemContainer, and in the rest part of HDFSUtils, we consistently
+ // use the same cacheLabel to operate HDFSFileSystemContainer, like close or remove.
+ // At the same time, we don't want to change the behavior of createFileSystem which is out of HDFSUtils,
+ // so we continue to use the original label to createFileSystem.
val newHDFSFileSystemContainer =
- new HDFSFileSystemContainer(createFileSystem(userName, conf), userName)
- fileSystemCache.put(userName, newHDFSFileSystemContainer)
+ new HDFSFileSystemContainer(createFileSystem(userName, label, conf), userName, cacheLabel)
+ fileSystemCache.put(cacheKey, newHDFSFileSystemContainer)
newHDFSFileSystemContainer
}
hdfsFileSystemContainer.addAccessCount()
@@ -140,11 +157,18 @@ object HDFSUtils extends Logging {
hdfsFileSystemContainer.getFileSystem
}
} else {
- createFileSystem(userName, conf)
+ createFileSystem(userName, label, conf)
}
def createFileSystem(userName: String, conf: org.apache.hadoop.conf.Configuration): FileSystem =
- getUserGroupInformation(userName)
+ createFileSystem(userName, null, conf)
+
+ def createFileSystem(
+ userName: String,
+ label: String,
+ conf: org.apache.hadoop.conf.Configuration
+ ): FileSystem =
+ getUserGroupInformation(userName, label)
.doAs(new PrivilegedExceptionAction[FileSystem] {
// scalastyle:off FileSystemGet
def run: FileSystem = FileSystem.get(conf)
@@ -152,18 +176,29 @@ object HDFSUtils extends Logging {
})
def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String): Unit =
- if (null != fileSystem && StringUtils.isNotBlank(userName)) {
- closeHDFSFIleSystem(fileSystem, userName, false)
- }
+ closeHDFSFIleSystem(fileSystem, userName, null, false)
+
+ def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, label: String): Unit =
+ closeHDFSFIleSystem(fileSystem, userName, label, false)
def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, isForce: Boolean): Unit =
+ closeHDFSFIleSystem(fileSystem, userName, null, isForce)
+
+ def closeHDFSFIleSystem(
+ fileSystem: FileSystem,
+ userName: String,
+ label: String,
+ isForce: Boolean
+ ): Unit =
if (null != fileSystem && StringUtils.isNotBlank(userName)) {
if (HadoopConf.HDFS_ENABLE_CACHE) {
- val hdfsFileSystemContainer = fileSystemCache.get(userName)
+ val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+ val cacheKey = userName + JOINT + cacheLabel
+ val hdfsFileSystemContainer = fileSystemCache.get(cacheKey)
if (null != hdfsFileSystemContainer) {
- val locker = userName + LOCKER_SUFFIX
+ val locker = cacheKey + LOCKER_SUFFIX
if (isForce) {
- locker synchronized fileSystemCache.remove(hdfsFileSystemContainer.getUser)
+ locker synchronized fileSystemCache.remove(cacheKey)
IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem)
logger.info(
s"user${hdfsFileSystemContainer.getUser} to Force remove hdfsFileSystemContainer"
@@ -178,17 +213,21 @@ object HDFSUtils extends Logging {
}
def getUserGroupInformation(userName: String): UserGroupInformation = {
- if (KERBEROS_ENABLE.getValue) {
- if (!KEYTAB_PROXYUSER_ENABLED.getValue) {
- val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath
- val user = getKerberosUser(userName)
- UserGroupInformation.setConfiguration(getConfiguration(userName))
+ getUserGroupInformation(userName, null);
+ }
+
+ def getUserGroupInformation(userName: String, label: String): UserGroupInformation = {
+ if (isKerberosEnabled(label)) {
+ if (!isKeytabProxyUserEnabled(label)) {
+ val path = new File(getKeytabPath(label), userName + ".keytab").getPath
+ val user = getKerberosUser(userName, label)
+ UserGroupInformation.setConfiguration(getConfigurationByLabel(userName, label))
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
} else {
- val superUser = KEYTAB_PROXYUSER_SUPERUSER.getValue
- val path = new File(KEYTAB_FILE.getValue, superUser + ".keytab").getPath
- val user = getKerberosUser(superUser)
- UserGroupInformation.setConfiguration(getConfiguration(superUser))
+ val superUser = getKeytabSuperUser(label)
+ val path = new File(getKeytabPath(label), superUser + ".keytab").getPath
+ val user = getKerberosUser(superUser, label)
+ UserGroupInformation.setConfiguration(getConfigurationByLabel(superUser, label))
UserGroupInformation.createProxyUser(
userName,
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
@@ -199,12 +238,79 @@ object HDFSUtils extends Logging {
}
}
- def getKerberosUser(userName: String): String = {
+ def isKerberosEnabled(label: String): Boolean = {
+ if (label == null) {
+ KERBEROS_ENABLE.getValue
+ } else {
+ kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
+ }
+ }
+
+ def isKeytabProxyUserEnabled(label: String): Boolean = {
+ if (label == null) {
+ KEYTAB_PROXYUSER_ENABLED.getValue
+ } else {
+ kerberosValueMapParser(KEYTAB_PROXYUSER_SUPERUSER_MAP.getValue).contains(label)
+ }
+ }
+
+ def getKerberosUser(userName: String, label: String): String = {
var user = userName
- if (KEYTAB_HOST_ENABLED.getValue) {
- user = user + "/" + KEYTAB_HOST.getValue
+ if (label == null) {
+ if (KEYTAB_HOST_ENABLED.getValue) {
+ user = user + "/" + KEYTAB_HOST.getValue
+ }
+ } else {
+ val hostMap = kerberosValueMapParser(KEYTAB_HOST_MAP.getValue)
+ if (hostMap.contains(label)) {
+ user = user + "/" + hostMap(label)
+ }
}
user
}
+ def getKeytabSuperUser(label: String): String = {
+ if (label == null) {
+ KEYTAB_PROXYUSER_SUPERUSER.getValue
+ } else {
+ kerberosValueMapParser(KEYTAB_PROXYUSER_SUPERUSER_MAP.getValue)(label)
+ }
+ }
+
+ def getKeytabPath(label: String): String = {
+ if (label == null) {
+ KEYTAB_FILE.getValue
+ } else {
+ val prefix = if (EXTERNAL_KEYTAB_FILE_PREFIX.getValue.endsWith("/")) {
+ EXTERNAL_KEYTAB_FILE_PREFIX.getValue
+ } else {
+ EXTERNAL_KEYTAB_FILE_PREFIX.getValue + "/"
+ }
+ prefix + label
+ }
+ }
+
+ private def kerberosValueMapParser(configV: String): Map[String, String] = {
+ val confDelimiter = ","
+ if (configV == null || "".equals(configV)) {
+ Map()
+ } else {
+ configV
+ .split(confDelimiter)
+ .filter(x => x != null && !"".equals(x))
+ .map(x => {
+ val confArr = x.split("=")
+ if (confArr.length == 2) {
+ (confArr(0).trim, confArr(1).trim)
+ } else null
+ })
+ .filter(kerberosValue =>
+ kerberosValue != null && StringUtils.isNotBlank(
+ kerberosValue._1
+ ) && null != kerberosValue._2
+ )
+ .toMap
+ }
+ }
+
}
diff --git a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HDFSUtilsTest.scala b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HDFSUtilsTest.scala
new file mode 100644
index 000000000..a64143181
--- /dev/null
+++ b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HDFSUtilsTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.hadoop.common.conf
+
+import org.apache.linkis.hadoop.common.utils.HDFSUtils
+
+import org.junit.jupiter.api.{Assertions, Test}
+
+class HDFSUtilsTest {
+
+ @Test
+ def testDefaultKerberosConfiguration: Unit = {
+ // isKerberosEnabled
+ Assertions.assertFalse(HDFSUtils.isKerberosEnabled(null))
+ Assertions.assertTrue(HDFSUtils.isKerberosEnabled("cluster2"))
+
+ // getKerberosUser
+ Assertions.assertEquals("user-test", HDFSUtils.getKerberosUser("user-test", null))
+ Assertions.assertEquals("user-test", HDFSUtils.getKerberosUser("user-test", "cluster3"))
+ Assertions.assertEquals(
+ "user-test/127.0.0.3",
+ HDFSUtils.getKerberosUser("user-test", "cluster2")
+ )
+
+ // isKeytabProxyUserEnabled
+ Assertions.assertFalse(HDFSUtils.isKeytabProxyUserEnabled(null))
+ Assertions.assertTrue(HDFSUtils.isKeytabProxyUserEnabled("cluster2"))
+
+ // getKeytabSuperUser
+ Assertions.assertEquals("hadoop2", HDFSUtils.getKeytabSuperUser("cluster2"))
+
+ // isKeytabProxyUserEnabled
+ Assertions.assertEquals("/appcom/keytab/", HDFSUtils.getKeytabPath(null))
+ Assertions.assertEquals(
+ "/appcom/config/external-conf/keytab/cluster2",
+ HDFSUtils.getKeytabPath("cluster2")
+ )
+ }
+
+}
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
index 871d35350..74260470a 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
@@ -17,6 +17,7 @@
package org.apache.linkis.storage.fs.impl;
+import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.hadoop.common.conf.HadoopConf;
import org.apache.linkis.hadoop.common.utils.HDFSUtils;
@@ -30,7 +31,6 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -53,7 +53,7 @@ public class HDFSFileSystem extends FileSystem {
public static final String HDFS_PREFIX_WITHOUT_AUTH = "hdfs:///";
public static final String HDFS_PREFIX_WITH_AUTH = "hdfs://";
private org.apache.hadoop.fs.FileSystem fs = null;
- private Configuration conf = null;
+ private org.apache.hadoop.conf.Configuration conf = null;
private String label = null;
@@ -182,6 +182,9 @@ public class HDFSFileSystem extends FileSystem {
throw new IOException("User cannot be empty(用户不能为空)");
}
+ if (label == null && (boolean) Configuration.IS_MULTIPLE_YARN_CLUSTER().getValue()) {
+ label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL.getValue();
+ }
conf = HDFSUtils.getConfigurationByLabel(user, label);
if (MapUtils.isNotEmpty(properties)) {
@@ -195,7 +198,7 @@ public class HDFSFileSystem extends FileSystem {
if (StorageConfiguration.FS_CACHE_DISABLE.getValue()) {
conf.set("fs.hdfs.impl.disable.cache", "true");
}
- fs = HDFSUtils.getHDFSUserFileSystem(user, conf);
+ fs = HDFSUtils.getHDFSUserFileSystem(user, label, conf);
if (fs == null) {
throw new IOException("init HDFS FileSystem failed!");
}
@@ -322,12 +325,12 @@ public class HDFSFileSystem extends FileSystem {
synchronized (this) {
if (fs != null) {
if (HadoopConf.HDFS_ENABLE_CACHE()) {
- HDFSUtils.closeHDFSFIleSystem(fs, user, true);
+ HDFSUtils.closeHDFSFIleSystem(fs, user, label, true);
} else {
- HDFSUtils.closeHDFSFIleSystem(fs, user);
+ HDFSUtils.closeHDFSFIleSystem(fs, user, label);
}
logger.warn(user + "FS reset close.");
- fs = HDFSUtils.getHDFSUserFileSystem(user, conf);
+ fs = HDFSUtils.getHDFSUserFileSystem(user, label, conf);
}
}
}
@@ -354,7 +357,7 @@ public class HDFSFileSystem extends FileSystem {
@Override
public void close() throws IOException {
if (null != fs) {
- HDFSUtils.closeHDFSFIleSystem(fs, user);
+ HDFSUtils.closeHDFSFIleSystem(fs, user, label);
} else {
logger.warn("FS was null, cannot close.");
}
@@ -388,12 +391,12 @@ public class HDFSFileSystem extends FileSystem {
FileStatus f = fs.getFileStatus(new Path(path));
FsPermission permission = f.getPermission();
- UserGroupInformation ugi = HDFSUtils.getUserGroupInformation(user);
+ UserGroupInformation ugi = HDFSUtils.getUserGroupInformation(user, label);
String[] groupNames;
try {
groupNames = ugi.getGroupNames();
} catch (NullPointerException e) {
- if ((Boolean) org.apache.linkis.common.conf.Configuration.IS_TEST_MODE().getValue()) {
+ if ((Boolean) Configuration.IS_TEST_MODE().getValue()) {
groupNames = new String[] {"hadoop"};
} else {
throw e;
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java
index 0a4181c13..99cf159a0 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java
@@ -151,6 +151,11 @@ public class OSSFileSystem extends FileSystem {
@Override
public void init(Map<String, String> properties) throws IOException {
// read origin configs from hadoop conf
+ if (label == null
+ && (boolean)
+ org.apache.linkis.common.conf.Configuration.IS_MULTIPLE_YARN_CLUSTER().getValue()) {
+ label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL.getValue();
+ }
conf = HDFSUtils.getConfigurationByLabel(user, label);
// origin configs
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java
index 03ec71797..70a3839b6 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java
@@ -47,6 +47,9 @@ public class StorageConfiguration {
public static CommonVars<String> STORAGE_RS_FILE_SUFFIX =
new CommonVars<>("wds.linkis.storage.rs.file.suffix", ".dolphin", null, null);
+ public static CommonVars<String> LINKIS_STORAGE_FS_LABEL =
+ new CommonVars<>("linkis.storage.default.fs.label", "linkis-storage", null, null);
+
public static List<String> ResultTypes =
Lists.newArrayList("%TEXT", "%TABLE", "%HTML", "%IMG", "%ANGULAR", "%SVG");
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java
index 7a38d9191..5613d87e1 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java
@@ -75,6 +75,8 @@ public class AppKeys {
JOB_LABEL + "." + LinkisKeys.KEY_CODETYPE; // corresponds to server api.
public static final String JOB_LABEL_EXECUTEONCE =
JOB_LABEL + "." + LinkisKeys.KEY_EXECUTEONCE; // corresponds to server api.
+ public static final String JOB_LABEL_CLUSTER =
+ JOB_LABEL + "." + LinkisKeys.KEY_CLUSTER; // corresponds to server api.
/*
Job command
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java
index 9488c25f4..848300313 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java
@@ -34,6 +34,7 @@ public class LinkisKeys {
public static final String KEY_EXECID = "execId";
public static final String KEY_UMUSER = "umUser";
public static final String KEY_EXECUTEONCE = "executeOnce";
+ public static final String KEY_CLUSTER = "yarnCluster";
public static final String KEY_TENANT = "tenant";
public static final String META_DATA_COLUMN_NAME = "columnName";
public static final String KEY_SHELL_WORKING_DIRECTORY =
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
index e238038ae..1bd3fb8c0 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
@@ -135,6 +135,15 @@ public class UniversalCmdTemplate extends AbstractCmdTemplate implements Cloneab
true,
false);
+ protected StdOption<String> clusterOP =
+ option(
+ AppKeys.JOB_LABEL,
+ AppKeys.JOB_LABEL_CLUSTER,
+ new String[] {"-yarnCluster"},
+ "specify linkis yarn cluster for this job",
+ true,
+ "");
+
protected StdOption<String> engineTypeOP =
option(
AppKeys.JOB_LABEL,
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
index 0c48d730a..8e5f9adfb 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.ecm.server.conf
import org.apache.linkis.common.conf.{CommonVars, TimeType}
import org.apache.linkis.common.utils.ByteTimeUtils
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.manager.common.conf.RMConfiguration
import java.io.File
import java.util.concurrent.TimeUnit
@@ -116,4 +117,16 @@ object ECMConfiguration {
val ECM_PROCESS_SCRIPT_KILL: Boolean =
CommonVars[Boolean]("wds.linkis.ecm.script.kill.engineconn", true).getValue
+ val ECM_YARN_CLUSTER_NAME: String =
+ CommonVars(
+ "wds.linkis.ecm.yarn.cluster.name",
+ RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue
+ ).getValue
+
+ val ECM_YARN_CLUSTER_TYPE: String =
+ CommonVars(
+ "wds.linkis.ecm.yarn.cluster.type",
+ RMConfiguration.DEFAULT_YARN_TYPE.getValue
+ ).getValue
+
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
index ca35b2779..13cfc3dba 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.ecm.server.service.impl
+import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
import org.apache.linkis.ecm.server.conf.ECMConfiguration._
@@ -30,6 +31,7 @@ import org.apache.linkis.manager.common.protocol.em.{
StopEMRequest
}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
+import org.apache.linkis.manager.label.entity.SerializableLabel
import org.apache.linkis.rpc.Sender
import java.util
@@ -57,6 +59,11 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
"alias",
ENGINE_CONN_MANAGER_SPRING_NAME
)
+
+ if (Configuration.IS_MULTIPLE_YARN_CLUSTER.getValue.asInstanceOf[Boolean]) {
+ labels.asScala += LabelKeyConstant.YARN_CLUSTER_KEY ->
+ (ECM_YARN_CLUSTER_TYPE + "_" + ECM_YARN_CLUSTER_NAME)
+ }
// TODO: group by key
labels
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 66e059b48..afc18bdc1 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -17,13 +17,16 @@
package org.apache.linkis.entrance.parser
+import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceIllegalParamException}
import org.apache.linkis.entrance.persistence.PersistenceManager
import org.apache.linkis.entrance.timeout.JobTimeoutManager
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.entity.job.JobRequest
+import org.apache.linkis.manager.common.conf.RMConfiguration
import org.apache.linkis.manager.label.builder.factory.{
LabelBuilderFactory,
LabelBuilderFactoryContext
@@ -31,6 +34,7 @@ import org.apache.linkis.manager.label.builder.factory.{
import org.apache.linkis.manager.label.conf.LabelCommonConfig
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, UserCreatorLabel}
import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
import org.apache.linkis.protocol.constants.TaskConstant
@@ -118,6 +122,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
checkEngineTypeLabel(labels)
generateAndVerifyCodeLanguageLabel(runType, labels)
generateAndVerifyUserCreatorLabel(executeUser, labels)
+ generateAndVerifyClusterLabel(labels)
jobRequest.setLabels(new util.ArrayList[Label[_]](labels.values()))
jobRequest.setSource(source)
@@ -189,6 +194,22 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
}
}
+ private def generateAndVerifyClusterLabel(labels: util.Map[String, Label[_]]): Unit = {
+ if (!Configuration.IS_MULTIPLE_YARN_CLUSTER.getValue.asInstanceOf[Boolean]) {
+ return
+ }
+ var clusterLabel = labels
+ .getOrDefault(LabelKeyConstant.YARN_CLUSTER_KEY, null)
+ .asInstanceOf[ClusterLabel]
+ if (clusterLabel == null) {
+ clusterLabel =
+ LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[ClusterLabel])
+ clusterLabel.setClusterName(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue)
+ clusterLabel.setClusterType(RMConfiguration.DEFAULT_YARN_TYPE.getValue)
+ labels.put(clusterLabel.getLabelKey, clusterLabel)
+ }
+ }
+
private def parseToOldTask(params: util.Map[String, AnyRef]): JobRequest = {
val jobReq = new JobRequest
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
index edf107376..98cfe2b33 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
@@ -334,8 +334,13 @@ public class YarnResourceRequester implements ExternalResourceRequester {
String principalName = (String) this.provider.getConfigMap().get("principalName");
String keytabPath = (String) this.provider.getConfigMap().get("keytabPath");
String krb5Path = (String) this.provider.getConfigMap().get("krb5Path");
+ if (StringUtils.isNotBlank(krb5Path)) {
+ logger.warn(
+ "krb5Path: {} has been specified, but not allow to be set to avoid conflict",
+ krb5Path);
+ }
RequestKerberosUrlUtils requestKuu =
- new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false);
+ new RequestKerberosUrlUtils(principalName, keytabPath, false);
HttpResponse response =
requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName);
httpResponse = response;
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java
index 12b729a05..4647dcab1 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java
@@ -67,12 +67,6 @@ public class RequestKerberosUrlUtils {
}
}
- public RequestKerberosUrlUtils(
- String principal, String keyTabLocation, String krb5Location, boolean isDebug) {
- this(principal, keyTabLocation, isDebug);
- System.setProperty("java.security.krb5.conf", krb5Location);
- }
-
private static HttpClient buildSpengoHttpClient() {
HttpClientBuilder builder = HttpClientBuilder.create();
Lookup<AuthSchemeProvider> authSchemeRegistry =
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
index c0fa56959..256b618e3 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
@@ -18,13 +18,14 @@
package org.apache.linkis.manager.label.entity.cluster;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EMNodeLabel;
import org.apache.linkis.manager.label.entity.Feature;
import org.apache.linkis.manager.label.entity.GenericLabel;
import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
import java.util.HashMap;
-public class ClusterLabel extends GenericLabel {
+public class ClusterLabel extends GenericLabel implements EMNodeLabel {
public ClusterLabel() {
setLabelKey(LabelKeyConstant.YARN_CLUSTER_KEY);
@@ -64,4 +65,18 @@ public class ClusterLabel extends GenericLabel {
}
return null;
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof ClusterLabel) {
+ if (null != getClusterName() && null != getClusterType()) {
+ return getClusterName().equals(((ClusterLabel) other).getClusterName())
+ && getClusterType().equals(((ClusterLabel) other).getClusterType());
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
}
diff --git a/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties b/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties
index 0cd9a59fd..d4c167720 100644
--- a/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties
+++ b/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties
@@ -17,6 +17,8 @@
##restful
wds.linkis.server.restful.scan.packages=org.apache.linkis.em.restful
wds.linkis.engineconn.root.dir=/appcom/tmp
+wds.linkis.ecm.yarn.cluster.name=default
+wds.linkis.ecm.yarn.cluster.type=Yarn
##Spring
spring.server.port=9102
##set engine environment in econn start script, such as SPARK3_HOME,the value of env will read from ecm host by key.
diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties
index 400b060fd..89de9eb91 100644
--- a/linkis-dist/package/conf/linkis.properties
+++ b/linkis-dist/package/conf/linkis.properties
@@ -17,6 +17,7 @@
#wds.linkis.test.mode=true
wds.linkis.server.version=v1
+wds.linkis.multiple.yarn.cluster=false
linkis.discovery.prefer-ip-address=false
linkis.discovery.server-address=http://127.0.0.1:20303
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java
index 3bccdd1d3..196d3342f 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java
@@ -19,7 +19,6 @@ package org.apache.linkis.metadata.query.service;
import org.apache.linkis.common.conf.CommonVars;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -48,15 +47,6 @@ public class HiveConnection implements Closeable {
private static final CommonVars<String> DEFAULT_SERVICE_USER =
CommonVars.apply("wds.linkis.server.mdm.service.user", "hadoop");
- private static final CommonVars<String> KERBEROS_KRB5_CONF_PATH =
- CommonVars.apply("wds.linkis.server.mdm.service.kerberos.krb5.path", "");
-
- static {
- if (StringUtils.isNotBlank(KERBEROS_KRB5_CONF_PATH.getValue())) {
- System.setProperty("java.security.krb5.conf", KERBEROS_KRB5_CONF_PATH.getValue());
- }
- }
-
public HiveConnection(
String uris, String principle, String keytabFilePath, Map<String, String> hadoopConf)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org