You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/17 03:25:13 UTC

[linkis] branch dev-1.4.0 updated: feat: support multiple cluster PART-1 #4179 (#4460)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new f1ed1dd6f feat: support multiple cluster PART-1 #4179 (#4460)
f1ed1dd6f is described below

commit f1ed1dd6fd9c6fa22e0f39f5984e311d68405ff7
Author: Cheng'hui Chen <27...@users.noreply.github.com>
AuthorDate: Wed May 17 11:25:06 2023 +0800

    feat: support multiple cluster PART-1 #4179 (#4460)
---
 .../apache/linkis/common/conf/Configuration.scala  |   2 +
 .../linkis/hadoop/common/conf/HadoopConf.scala     |  12 ++
 .../common/entity/HDFSFileSystemContainer.scala    |   4 +-
 .../linkis/hadoop/common/utils/HDFSUtils.scala     | 160 +++++++++++++++++----
 .../linkis/hadoop/common/conf/HDFSUtilsTest.scala  |  55 +++++++
 .../linkis/storage/fs/impl/HDFSFileSystem.java     |  21 +--
 .../linkis/storage/fs/impl/OSSFileSystem.java      |   5 +
 .../linkis/storage/utils/StorageConfiguration.java |   3 +
 .../linkis/cli/application/constants/AppKeys.java  |   2 +
 .../cli/application/constants/LinkisKeys.java      |   1 +
 .../command/template/UniversalCmdTemplate.java     |   9 ++
 .../linkis/ecm/server/conf/ECMConfiguration.scala  |  13 ++
 .../service/impl/DefaultECMRegisterService.scala   |   7 +
 .../entrance/parser/CommonEntranceParser.scala     |  21 +++
 .../rm/external/yarn/YarnResourceRequester.java    |   7 +-
 .../manager/rm/utils/RequestKerberosUrlUtils.java  |   6 -
 .../manager/label/entity/cluster/ClusterLabel.java |  17 ++-
 .../conf/linkis-cg-engineconnmanager.properties    |   2 +
 linkis-dist/package/conf/linkis.properties         |   1 +
 .../metadata/query/service/HiveConnection.java     |  10 --
 20 files changed, 303 insertions(+), 55 deletions(-)

diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
index 8ac94739c..9443f1526 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
@@ -33,6 +33,8 @@ object Configuration extends Logging {
 
   val IS_PROMETHEUS_ENABLE = CommonVars("wds.linkis.prometheus.enable", false)
 
+  val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false)
+
   val PROMETHEUS_ENDPOINT = CommonVars("wds.linkis.prometheus.endpoint", "/actuator/prometheus")
 
   val LINKIS_HOME = CommonVars("wds.linkis.home", CommonVars("LINKIS_HOME", "/tmp").getValue)
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index fc7f91504..b3e5cf202 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -25,16 +25,28 @@ object HadoopConf {
 
   val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
 
+  val KERBEROS_ENABLE_MAP =
+    CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
+
   val KEYTAB_FILE = CommonVars("wds.linkis.keytab.file", "/appcom/keytab/")
 
+  val EXTERNAL_KEYTAB_FILE_PREFIX =
+    CommonVars("linkis.external.keytab.file.prefix", "/appcom/config/external-conf/keytab")
+
   val KEYTAB_HOST = CommonVars("wds.linkis.keytab.host", "127.0.0.1")
 
+  val KEYTAB_HOST_MAP =
+    CommonVars("linkis.keytab.host.map", "cluster1=127.0.0.2,cluster2=127.0.0.3")
+
   val KEYTAB_HOST_ENABLED = CommonVars("wds.linkis.keytab.host.enabled", false)
 
   val KEYTAB_PROXYUSER_ENABLED = CommonVars("wds.linkis.keytab.proxyuser.enable", false)
 
   val KEYTAB_PROXYUSER_SUPERUSER = CommonVars("wds.linkis.keytab.proxyuser.superuser", "hadoop")
 
+  val KEYTAB_PROXYUSER_SUPERUSER_MAP =
+    CommonVars("linkis.keytab.proxyuser.superuser.map", "cluster1=hadoop1,cluster2=hadoop2")
+
   val hadoopConfDir =
     CommonVars("hadoop.config.dir", CommonVars("HADOOP_CONF_DIR", "").getValue).getValue
 
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
index dfbc5c934..6b4eaaece 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
@@ -21,7 +21,7 @@ import org.apache.linkis.hadoop.common.conf.HadoopConf
 
 import org.apache.hadoop.fs.FileSystem
 
-class HDFSFileSystemContainer(fs: FileSystem, user: String) {
+class HDFSFileSystemContainer(fs: FileSystem, user: String, label: String) {
 
   private var lastAccessTime: Long = System.currentTimeMillis()
 
@@ -31,6 +31,8 @@ class HDFSFileSystemContainer(fs: FileSystem, user: String) {
 
   def getUser: String = this.user
 
+  def getLabel: String = this.label
+
   def getLastAccessTime: Long = this.lastAccessTime
 
   def updateLastAccessTime: Unit = {
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index 922b5f6a8..f2a615e99 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -41,6 +41,8 @@ object HDFSUtils extends Logging {
     new java.util.HashMap[String, HDFSFileSystemContainer]()
 
   private val LOCKER_SUFFIX = "_HDFS"
+  private val DEFAULT_CACHE_LABEL = "default"
+  private val JOINT = "_"
 
   if (HadoopConf.HDFS_ENABLE_CACHE) {
     logger.info("HDFS Cache enabled ")
@@ -56,10 +58,13 @@ object HDFSUtils extends Logging {
               )
             }
             .foreach { hdfsFileSystemContainer =>
-              val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
+              val locker =
+                hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel + LOCKER_SUFFIX
               locker.intern() synchronized {
                 if (hdfsFileSystemContainer.canRemove()) {
-                  fileSystemCache.remove(hdfsFileSystemContainer.getUser)
+                  fileSystemCache.remove(
+                    hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel
+                  )
                   IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem)
                   logger.info(
                     s"user${hdfsFileSystemContainer.getUser} to remove hdfsFileSystemContainer,because hdfsFileSystemContainer can remove"
@@ -124,15 +129,27 @@ object HDFSUtils extends Logging {
   def getHDFSUserFileSystem(
       userName: String,
       conf: org.apache.hadoop.conf.Configuration
+  ): FileSystem = getHDFSUserFileSystem(userName, null, conf)
+
+  def getHDFSUserFileSystem(
+      userName: String,
+      label: String,
+      conf: org.apache.hadoop.conf.Configuration
   ): FileSystem = if (HadoopConf.HDFS_ENABLE_CACHE) {
-    val locker = userName + LOCKER_SUFFIX
+    val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+    val cacheKey = userName + JOINT + cacheLabel
+    val locker = cacheKey + LOCKER_SUFFIX
     locker.intern().synchronized {
-      val hdfsFileSystemContainer = if (fileSystemCache.containsKey(userName)) {
-        fileSystemCache.get(userName)
+      val hdfsFileSystemContainer = if (fileSystemCache.containsKey(cacheKey)) {
+        fileSystemCache.get(cacheKey)
       } else {
+        // we use cacheLabel to create HDFSFileSystemContainer, and in the rest part of HDFSUtils, we consistently
+        // use the same cacheLabel to operate HDFSFileSystemContainer, like close or remove.
+        // At the same time, we don't want to change the behavior of createFileSystem which is out of HDFSUtils,
+        // so we continue to use the original label to createFileSystem.
         val newHDFSFileSystemContainer =
-          new HDFSFileSystemContainer(createFileSystem(userName, conf), userName)
-        fileSystemCache.put(userName, newHDFSFileSystemContainer)
+          new HDFSFileSystemContainer(createFileSystem(userName, label, conf), userName, cacheLabel)
+        fileSystemCache.put(cacheKey, newHDFSFileSystemContainer)
         newHDFSFileSystemContainer
       }
       hdfsFileSystemContainer.addAccessCount()
@@ -140,11 +157,18 @@ object HDFSUtils extends Logging {
       hdfsFileSystemContainer.getFileSystem
     }
   } else {
-    createFileSystem(userName, conf)
+    createFileSystem(userName, label, conf)
   }
 
   def createFileSystem(userName: String, conf: org.apache.hadoop.conf.Configuration): FileSystem =
-    getUserGroupInformation(userName)
+    createFileSystem(userName, null, conf)
+
+  def createFileSystem(
+      userName: String,
+      label: String,
+      conf: org.apache.hadoop.conf.Configuration
+  ): FileSystem =
+    getUserGroupInformation(userName, label)
       .doAs(new PrivilegedExceptionAction[FileSystem] {
         // scalastyle:off FileSystemGet
         def run: FileSystem = FileSystem.get(conf)
@@ -152,18 +176,29 @@ object HDFSUtils extends Logging {
       })
 
   def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String): Unit =
-    if (null != fileSystem && StringUtils.isNotBlank(userName)) {
-      closeHDFSFIleSystem(fileSystem, userName, false)
-    }
+    closeHDFSFIleSystem(fileSystem, userName, null, false)
+
+  def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, label: String): Unit =
+    closeHDFSFIleSystem(fileSystem, userName, label, false)
 
   def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, isForce: Boolean): Unit =
+    closeHDFSFIleSystem(fileSystem, userName, null, isForce)
+
+  def closeHDFSFIleSystem(
+      fileSystem: FileSystem,
+      userName: String,
+      label: String,
+      isForce: Boolean
+  ): Unit =
     if (null != fileSystem && StringUtils.isNotBlank(userName)) {
       if (HadoopConf.HDFS_ENABLE_CACHE) {
-        val hdfsFileSystemContainer = fileSystemCache.get(userName)
+        val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+        val cacheKey = userName + JOINT + cacheLabel
+        val hdfsFileSystemContainer = fileSystemCache.get(cacheKey)
         if (null != hdfsFileSystemContainer) {
-          val locker = userName + LOCKER_SUFFIX
+          val locker = cacheKey + LOCKER_SUFFIX
           if (isForce) {
-            locker synchronized fileSystemCache.remove(hdfsFileSystemContainer.getUser)
+            locker synchronized fileSystemCache.remove(cacheKey)
             IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem)
             logger.info(
               s"user${hdfsFileSystemContainer.getUser} to Force remove hdfsFileSystemContainer"
@@ -178,17 +213,21 @@ object HDFSUtils extends Logging {
     }
 
   def getUserGroupInformation(userName: String): UserGroupInformation = {
-    if (KERBEROS_ENABLE.getValue) {
-      if (!KEYTAB_PROXYUSER_ENABLED.getValue) {
-        val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath
-        val user = getKerberosUser(userName)
-        UserGroupInformation.setConfiguration(getConfiguration(userName))
+    getUserGroupInformation(userName, null);
+  }
+
+  def getUserGroupInformation(userName: String, label: String): UserGroupInformation = {
+    if (isKerberosEnabled(label)) {
+      if (!isKeytabProxyUserEnabled(label)) {
+        val path = new File(getKeytabPath(label), userName + ".keytab").getPath
+        val user = getKerberosUser(userName, label)
+        UserGroupInformation.setConfiguration(getConfigurationByLabel(userName, label))
         UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
       } else {
-        val superUser = KEYTAB_PROXYUSER_SUPERUSER.getValue
-        val path = new File(KEYTAB_FILE.getValue, superUser + ".keytab").getPath
-        val user = getKerberosUser(superUser)
-        UserGroupInformation.setConfiguration(getConfiguration(superUser))
+        val superUser = getKeytabSuperUser(label)
+        val path = new File(getKeytabPath(label), superUser + ".keytab").getPath
+        val user = getKerberosUser(superUser, label)
+        UserGroupInformation.setConfiguration(getConfigurationByLabel(superUser, label))
         UserGroupInformation.createProxyUser(
           userName,
           UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
@@ -199,12 +238,79 @@ object HDFSUtils extends Logging {
     }
   }
 
-  def getKerberosUser(userName: String): String = {
+  def isKerberosEnabled(label: String): Boolean = {
+    if (label == null) {
+      KERBEROS_ENABLE.getValue
+    } else {
+      kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
+    }
+  }
+
+  def isKeytabProxyUserEnabled(label: String): Boolean = {
+    if (label == null) {
+      KEYTAB_PROXYUSER_ENABLED.getValue
+    } else {
+      kerberosValueMapParser(KEYTAB_PROXYUSER_SUPERUSER_MAP.getValue).contains(label)
+    }
+  }
+
+  def getKerberosUser(userName: String, label: String): String = {
     var user = userName
-    if (KEYTAB_HOST_ENABLED.getValue) {
-      user = user + "/" + KEYTAB_HOST.getValue
+    if (label == null) {
+      if (KEYTAB_HOST_ENABLED.getValue) {
+        user = user + "/" + KEYTAB_HOST.getValue
+      }
+    } else {
+      val hostMap = kerberosValueMapParser(KEYTAB_HOST_MAP.getValue)
+      if (hostMap.contains(label)) {
+        user = user + "/" + hostMap(label)
+      }
     }
     user
   }
 
+  def getKeytabSuperUser(label: String): String = {
+    if (label == null) {
+      KEYTAB_PROXYUSER_SUPERUSER.getValue
+    } else {
+      kerberosValueMapParser(KEYTAB_PROXYUSER_SUPERUSER_MAP.getValue)(label)
+    }
+  }
+
+  def getKeytabPath(label: String): String = {
+    if (label == null) {
+      KEYTAB_FILE.getValue
+    } else {
+      val prefix = if (EXTERNAL_KEYTAB_FILE_PREFIX.getValue.endsWith("/")) {
+        EXTERNAL_KEYTAB_FILE_PREFIX.getValue
+      } else {
+        EXTERNAL_KEYTAB_FILE_PREFIX.getValue + "/"
+      }
+      prefix + label
+    }
+  }
+
+  private def kerberosValueMapParser(configV: String): Map[String, String] = {
+    val confDelimiter = ","
+    if (configV == null || "".equals(configV)) {
+      Map()
+    } else {
+      configV
+        .split(confDelimiter)
+        .filter(x => x != null && !"".equals(x))
+        .map(x => {
+          val confArr = x.split("=")
+          if (confArr.length == 2) {
+            (confArr(0).trim, confArr(1).trim)
+          } else null
+        })
+        .filter(kerberosValue =>
+          kerberosValue != null && StringUtils.isNotBlank(
+            kerberosValue._1
+          ) && null != kerberosValue._2
+        )
+        .toMap
+    }
+  }
+
 }
diff --git a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HDFSUtilsTest.scala b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HDFSUtilsTest.scala
new file mode 100644
index 000000000..a64143181
--- /dev/null
+++ b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HDFSUtilsTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.hadoop.common.conf
+
+import org.apache.linkis.hadoop.common.utils.HDFSUtils
+
+import org.junit.jupiter.api.{Assertions, Test}
+
+class HDFSUtilsTest {
+
+  @Test
+  def testDefaultKerberosConfiguration: Unit = {
+    // isKerberosEnabled
+    Assertions.assertFalse(HDFSUtils.isKerberosEnabled(null))
+    Assertions.assertTrue(HDFSUtils.isKerberosEnabled("cluster2"))
+
+    // getKerberosUser
+    Assertions.assertEquals("user-test", HDFSUtils.getKerberosUser("user-test", null))
+    Assertions.assertEquals("user-test", HDFSUtils.getKerberosUser("user-test", "cluster3"))
+    Assertions.assertEquals(
+      "user-test/127.0.0.3",
+      HDFSUtils.getKerberosUser("user-test", "cluster2")
+    )
+
+    // isKeytabProxyUserEnabled
+    Assertions.assertFalse(HDFSUtils.isKeytabProxyUserEnabled(null))
+    Assertions.assertTrue(HDFSUtils.isKeytabProxyUserEnabled("cluster2"))
+
+    // getKeytabSuperUser
+    Assertions.assertEquals("hadoop2", HDFSUtils.getKeytabSuperUser("cluster2"))
+
+    // isKeytabProxyUserEnabled
+    Assertions.assertEquals("/appcom/keytab/", HDFSUtils.getKeytabPath(null))
+    Assertions.assertEquals(
+      "/appcom/config/external-conf/keytab/cluster2",
+      HDFSUtils.getKeytabPath("cluster2")
+    )
+  }
+
+}
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
index 871d35350..74260470a 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.storage.fs.impl;
 
+import org.apache.linkis.common.conf.Configuration;
 import org.apache.linkis.common.io.FsPath;
 import org.apache.linkis.hadoop.common.conf.HadoopConf;
 import org.apache.linkis.hadoop.common.utils.HDFSUtils;
@@ -30,7 +31,6 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -53,7 +53,7 @@ public class HDFSFileSystem extends FileSystem {
   public static final String HDFS_PREFIX_WITHOUT_AUTH = "hdfs:///";
   public static final String HDFS_PREFIX_WITH_AUTH = "hdfs://";
   private org.apache.hadoop.fs.FileSystem fs = null;
-  private Configuration conf = null;
+  private org.apache.hadoop.conf.Configuration conf = null;
 
   private String label = null;
 
@@ -182,6 +182,9 @@ public class HDFSFileSystem extends FileSystem {
       throw new IOException("User cannot be empty(用户不能为空)");
     }
 
+    if (label == null && (boolean) Configuration.IS_MULTIPLE_YARN_CLUSTER().getValue()) {
+      label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL.getValue();
+    }
     conf = HDFSUtils.getConfigurationByLabel(user, label);
 
     if (MapUtils.isNotEmpty(properties)) {
@@ -195,7 +198,7 @@ public class HDFSFileSystem extends FileSystem {
     if (StorageConfiguration.FS_CACHE_DISABLE.getValue()) {
       conf.set("fs.hdfs.impl.disable.cache", "true");
     }
-    fs = HDFSUtils.getHDFSUserFileSystem(user, conf);
+    fs = HDFSUtils.getHDFSUserFileSystem(user, label, conf);
     if (fs == null) {
       throw new IOException("init HDFS FileSystem failed!");
     }
@@ -322,12 +325,12 @@ public class HDFSFileSystem extends FileSystem {
       synchronized (this) {
         if (fs != null) {
           if (HadoopConf.HDFS_ENABLE_CACHE()) {
-            HDFSUtils.closeHDFSFIleSystem(fs, user, true);
+            HDFSUtils.closeHDFSFIleSystem(fs, user, label, true);
           } else {
-            HDFSUtils.closeHDFSFIleSystem(fs, user);
+            HDFSUtils.closeHDFSFIleSystem(fs, user, label);
           }
           logger.warn(user + "FS reset close.");
-          fs = HDFSUtils.getHDFSUserFileSystem(user, conf);
+          fs = HDFSUtils.getHDFSUserFileSystem(user, label, conf);
         }
       }
     }
@@ -354,7 +357,7 @@ public class HDFSFileSystem extends FileSystem {
   @Override
   public void close() throws IOException {
     if (null != fs) {
-      HDFSUtils.closeHDFSFIleSystem(fs, user);
+      HDFSUtils.closeHDFSFIleSystem(fs, user, label);
     } else {
       logger.warn("FS was null, cannot close.");
     }
@@ -388,12 +391,12 @@ public class HDFSFileSystem extends FileSystem {
 
     FileStatus f = fs.getFileStatus(new Path(path));
     FsPermission permission = f.getPermission();
-    UserGroupInformation ugi = HDFSUtils.getUserGroupInformation(user);
+    UserGroupInformation ugi = HDFSUtils.getUserGroupInformation(user, label);
     String[] groupNames;
     try {
       groupNames = ugi.getGroupNames();
     } catch (NullPointerException e) {
-      if ((Boolean) org.apache.linkis.common.conf.Configuration.IS_TEST_MODE().getValue()) {
+      if ((Boolean) Configuration.IS_TEST_MODE().getValue()) {
         groupNames = new String[] {"hadoop"};
       } else {
         throw e;
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java
index 0a4181c13..99cf159a0 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java
@@ -151,6 +151,11 @@ public class OSSFileSystem extends FileSystem {
   @Override
   public void init(Map<String, String> properties) throws IOException {
     // read origin configs from hadoop conf
+    if (label == null
+        && (boolean)
+            org.apache.linkis.common.conf.Configuration.IS_MULTIPLE_YARN_CLUSTER().getValue()) {
+      label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL.getValue();
+    }
     conf = HDFSUtils.getConfigurationByLabel(user, label);
 
     // origin configs
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java
index 03ec71797..70a3839b6 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java
@@ -47,6 +47,9 @@ public class StorageConfiguration {
   public static CommonVars<String> STORAGE_RS_FILE_SUFFIX =
       new CommonVars<>("wds.linkis.storage.rs.file.suffix", ".dolphin", null, null);
 
+  public static CommonVars<String> LINKIS_STORAGE_FS_LABEL =
+      new CommonVars<>("linkis.storage.default.fs.label", "linkis-storage", null, null);
+
   public static List<String> ResultTypes =
       Lists.newArrayList("%TEXT", "%TABLE", "%HTML", "%IMG", "%ANGULAR", "%SVG");
 
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java
index 7a38d9191..5613d87e1 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/AppKeys.java
@@ -75,6 +75,8 @@ public class AppKeys {
       JOB_LABEL + "." + LinkisKeys.KEY_CODETYPE; // corresponds to server api.
   public static final String JOB_LABEL_EXECUTEONCE =
       JOB_LABEL + "." + LinkisKeys.KEY_EXECUTEONCE; // corresponds to server api.
+  public static final String JOB_LABEL_CLUSTER =
+      JOB_LABEL + "." + LinkisKeys.KEY_CLUSTER; // corresponds to server api.
 
   /*
   Job command
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java
index 9488c25f4..848300313 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java
@@ -34,6 +34,7 @@ public class LinkisKeys {
   public static final String KEY_EXECID = "execId";
   public static final String KEY_UMUSER = "umUser";
   public static final String KEY_EXECUTEONCE = "executeOnce";
+  public static final String KEY_CLUSTER = "yarnCluster";
   public static final String KEY_TENANT = "tenant";
   public static final String META_DATA_COLUMN_NAME = "columnName";
   public static final String KEY_SHELL_WORKING_DIRECTORY =
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
index e238038ae..1bd3fb8c0 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/linkis-cli-application/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
@@ -135,6 +135,15 @@ public class UniversalCmdTemplate extends AbstractCmdTemplate implements Cloneab
           true,
           false);
 
+  protected StdOption<String> clusterOP =
+      option(
+          AppKeys.JOB_LABEL,
+          AppKeys.JOB_LABEL_CLUSTER,
+          new String[] {"-yarnCluster"},
+          "specify linkis yarn cluster for this job",
+          true,
+          "");
+
   protected StdOption<String> engineTypeOP =
       option(
           AppKeys.JOB_LABEL,
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
index 0c48d730a..8e5f9adfb 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.ecm.server.conf
 import org.apache.linkis.common.conf.{CommonVars, TimeType}
 import org.apache.linkis.common.utils.ByteTimeUtils
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.manager.common.conf.RMConfiguration
 
 import java.io.File
 import java.util.concurrent.TimeUnit
@@ -116,4 +117,16 @@ object ECMConfiguration {
   val ECM_PROCESS_SCRIPT_KILL: Boolean =
     CommonVars[Boolean]("wds.linkis.ecm.script.kill.engineconn", true).getValue
 
+  val ECM_YARN_CLUSTER_NAME: String =
+    CommonVars(
+      "wds.linkis.ecm.yarn.cluster.name",
+      RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue
+    ).getValue
+
+  val ECM_YARN_CLUSTER_TYPE: String =
+    CommonVars(
+      "wds.linkis.ecm.yarn.cluster.type",
+      RMConfiguration.DEFAULT_YARN_TYPE.getValue
+    ).getValue
+
 }
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
index ca35b2779..13cfc3dba 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.ecm.server.service.impl
 
+import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
 import org.apache.linkis.ecm.server.conf.ECMConfiguration._
@@ -30,6 +31,7 @@ import org.apache.linkis.manager.common.protocol.em.{
   StopEMRequest
 }
 import org.apache.linkis.manager.label.constant.LabelKeyConstant
+import org.apache.linkis.manager.label.entity.SerializableLabel
 import org.apache.linkis.rpc.Sender
 
 import java.util
@@ -57,6 +59,11 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
       "alias",
       ENGINE_CONN_MANAGER_SPRING_NAME
     )
+
+    if (Configuration.IS_MULTIPLE_YARN_CLUSTER.getValue.asInstanceOf[Boolean]) {
+      labels.asScala += LabelKeyConstant.YARN_CLUSTER_KEY ->
+        (ECM_YARN_CLUSTER_TYPE + "_" + ECM_YARN_CLUSTER_NAME)
+    }
     // TODO: group  by key
     labels
   }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 66e059b48..afc18bdc1 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -17,13 +17,16 @@
 
 package org.apache.linkis.entrance.parser
 
+import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.entrance.conf.EntranceConfiguration
 import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
 import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceIllegalParamException}
 import org.apache.linkis.entrance.persistence.PersistenceManager
 import org.apache.linkis.entrance.timeout.JobTimeoutManager
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.governance.common.entity.job.JobRequest
+import org.apache.linkis.manager.common.conf.RMConfiguration
 import org.apache.linkis.manager.label.builder.factory.{
   LabelBuilderFactory,
   LabelBuilderFactoryContext
@@ -31,6 +34,7 @@ import org.apache.linkis.manager.label.builder.factory.{
 import org.apache.linkis.manager.label.conf.LabelCommonConfig
 import org.apache.linkis.manager.label.constant.LabelKeyConstant
 import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel
 import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, UserCreatorLabel}
 import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
 import org.apache.linkis.protocol.constants.TaskConstant
@@ -118,6 +122,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
     checkEngineTypeLabel(labels)
     generateAndVerifyCodeLanguageLabel(runType, labels)
     generateAndVerifyUserCreatorLabel(executeUser, labels)
+    generateAndVerifyClusterLabel(labels)
 
     jobRequest.setLabels(new util.ArrayList[Label[_]](labels.values()))
     jobRequest.setSource(source)
@@ -189,6 +194,22 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
     }
   }
 
+  private def generateAndVerifyClusterLabel(labels: util.Map[String, Label[_]]): Unit = {
+    if (!Configuration.IS_MULTIPLE_YARN_CLUSTER.getValue.asInstanceOf[Boolean]) {
+      return
+    }
+    var clusterLabel = labels
+      .getOrDefault(LabelKeyConstant.YARN_CLUSTER_KEY, null)
+      .asInstanceOf[ClusterLabel]
+    if (clusterLabel == null) {
+      clusterLabel =
+        LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[ClusterLabel])
+      clusterLabel.setClusterName(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue)
+      clusterLabel.setClusterType(RMConfiguration.DEFAULT_YARN_TYPE.getValue)
+      labels.put(clusterLabel.getLabelKey, clusterLabel)
+    }
+  }
+
   private def parseToOldTask(params: util.Map[String, AnyRef]): JobRequest = {
 
     val jobReq = new JobRequest
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
index edf107376..98cfe2b33 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
@@ -334,8 +334,13 @@ public class YarnResourceRequester implements ExternalResourceRequester {
         String principalName = (String) this.provider.getConfigMap().get("principalName");
         String keytabPath = (String) this.provider.getConfigMap().get("keytabPath");
         String krb5Path = (String) this.provider.getConfigMap().get("krb5Path");
+        if (StringUtils.isNotBlank(krb5Path)) {
+          logger.warn(
+              "krb5Path: {} has been specified, but not allow to be set to avoid conflict",
+              krb5Path);
+        }
         RequestKerberosUrlUtils requestKuu =
-            new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false);
+            new RequestKerberosUrlUtils(principalName, keytabPath, false);
         HttpResponse response =
             requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName);
         httpResponse = response;
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java
index 12b729a05..4647dcab1 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RequestKerberosUrlUtils.java
@@ -67,12 +67,6 @@ public class RequestKerberosUrlUtils {
     }
   }
 
-  public RequestKerberosUrlUtils(
-      String principal, String keyTabLocation, String krb5Location, boolean isDebug) {
-    this(principal, keyTabLocation, isDebug);
-    System.setProperty("java.security.krb5.conf", krb5Location);
-  }
-
   private static HttpClient buildSpengoHttpClient() {
     HttpClientBuilder builder = HttpClientBuilder.create();
     Lookup<AuthSchemeProvider> authSchemeRegistry =
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
index c0fa56959..256b618e3 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
@@ -18,13 +18,14 @@
 package org.apache.linkis.manager.label.entity.cluster;
 
 import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EMNodeLabel;
 import org.apache.linkis.manager.label.entity.Feature;
 import org.apache.linkis.manager.label.entity.GenericLabel;
 import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
 
 import java.util.HashMap;
 
-public class ClusterLabel extends GenericLabel {
+public class ClusterLabel extends GenericLabel implements EMNodeLabel {
 
   public ClusterLabel() {
     setLabelKey(LabelKeyConstant.YARN_CLUSTER_KEY);
@@ -64,4 +65,18 @@ public class ClusterLabel extends GenericLabel {
     }
     return null;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof ClusterLabel) {
+      if (null != getClusterName() && null != getClusterType()) {
+        return getClusterName().equals(((ClusterLabel) other).getClusterName())
+            && getClusterType().equals(((ClusterLabel) other).getClusterType());
+      } else {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
 }
diff --git a/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties b/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties
index 0cd9a59fd..d4c167720 100644
--- a/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties
+++ b/linkis-dist/package/conf/linkis-cg-engineconnmanager.properties
@@ -17,6 +17,8 @@
 ##restful
 wds.linkis.server.restful.scan.packages=org.apache.linkis.em.restful
 wds.linkis.engineconn.root.dir=/appcom/tmp
+wds.linkis.ecm.yarn.cluster.name=default
+wds.linkis.ecm.yarn.cluster.type=Yarn
 ##Spring
 spring.server.port=9102
 ##set engine environment in econn start script, such as SPARK3_HOME,the value of env will read from ecm host by key.
diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties
index 400b060fd..89de9eb91 100644
--- a/linkis-dist/package/conf/linkis.properties
+++ b/linkis-dist/package/conf/linkis.properties
@@ -17,6 +17,7 @@
 #wds.linkis.test.mode=true
 
 wds.linkis.server.version=v1
+wds.linkis.multiple.yarn.cluster=false
 linkis.discovery.prefer-ip-address=false
 linkis.discovery.server-address=http://127.0.0.1:20303
 
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java
index 3bccdd1d3..196d3342f 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveConnection.java
@@ -19,7 +19,6 @@ package org.apache.linkis.metadata.query.service;
 
 import org.apache.linkis.common.conf.CommonVars;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -48,15 +47,6 @@ public class HiveConnection implements Closeable {
   private static final CommonVars<String> DEFAULT_SERVICE_USER =
       CommonVars.apply("wds.linkis.server.mdm.service.user", "hadoop");
 
-  private static final CommonVars<String> KERBEROS_KRB5_CONF_PATH =
-      CommonVars.apply("wds.linkis.server.mdm.service.kerberos.krb5.path", "");
-
-  static {
-    if (StringUtils.isNotBlank(KERBEROS_KRB5_CONF_PATH.getValue())) {
-      System.setProperty("java.security.krb5.conf", KERBEROS_KRB5_CONF_PATH.getValue());
-    }
-  }
-
   public HiveConnection(
       String uris, String principle, String keytabFilePath, Map<String, String> hadoopConf)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org