You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/10/25 05:04:08 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1262] Support both KERBEROS and PLAIN authentication at the same time

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 24cf1bd  [KYUUBI #1262] Support both KERBEROS and PLAIN authentication at the same time
24cf1bd is described below

commit 24cf1bd7208d663052767aebb7290475ea7e3128
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Oct 25 13:03:55 2021 +0800

    [KYUUBI #1262] Support both KERBEROS and PLAIN authentication at the same time
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Support both KERBEROS and PLAIN authentication at the same time.
    
    ### _How was this patch tested?_
    
    Added UT & IT.
    I make integration testing on our dev cluster with KERBEROS and CUSTOM authentication.
    ```
    kyuubi.authentication	KERBEROS,CUSTOM
    kyuubi.authentication.custom.class=org.apache.kyuubi.ZeusCustom
    ```
    
    For org.apache.kyuubi.ZeusCustom,  it checks whether the user equals password.
    ```
    package org.apache.kyuubi
    
    import javax.security.sasl.AuthenticationException
    
    import org.apache.kyuubi.service.authentication.PasswdAuthenticationProvider
    
    class ZeusCustom
      extends PasswdAuthenticationProvider with Logging {
    
      override def authenticate(user: String, password: String): Unit = {
        if (user == password) {
          info(s"Success log in of user: $user")
        } else {
          throw new AuthenticationException("Username or password is not valid!")
        }
      }
    }
    ```
    
    1. kerberos testing with user b_zeus
    ![image](https://user-images.githubusercontent.com/6757692/138547539-23ce62cf-07bc-4027-ba91-d5099126afde.png)
    
    2. CUTOM authentication testing with user b_zeus
    ![image](https://user-images.githubusercontent.com/6757692/138547423-e68b3d82-64ab-450a-8fe7-990cab292fd7.png)
    
    Note that: they share the same backend spark engine, because they are the same user.
    
    Closes #1266 from turboFei/multiple_auth_KYUUBI-1262.
    
    Closes #1262
    
    71053aef [fwang12] adress nit
    850d6b5d [fwang12] fix ut
    ea7db79f [fwang12] complete
    11f409cb [fwang12] Update docs
    b1f83e55 [fwang12] add ut
    8d137db9 [fwang12] make ldap password diff with custom
    d227aa74 [fwang12] fix ut
    d7cfaf4c [fwang12] only the first is valid
    2e2283ba [fwang12] after all
    ee0e8bc0 [fwang12] make kerberoes enabled
    4fc63081 [fwang12] refactor kerbereos helper
    6691cc57 [fwang12] save
    cd813ecf [fwang12] refactor
    dd706740 [fwang12] retest
    f4038e93 [fwang12] fix code style
    7b590a23 [fwang12] add ut
    e39e19e6 [fwang12] add it
    7dc7c927 [fwang12] with password
    8dadfd32 [fwang12] refactor ldap suite
    8545a033 [fwang12] add ut
    1aa30a5c [fwang12] refactor
    8cc2ea66 [fwang12] fix ut
    10f788ae [fwang12] before all
    98f93640 [fwang12] revert sth
    bb75f8e9 [fwang12] save
    314579f1 [fwang12] update default
    ac8b195f [fwang12] [KYUUBI #1262] Support multiple kinds of SASL authentication type
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 docs/deployment/settings.md                        |   2 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  19 ++--
 .../KyuubiAuthenticationFactory.scala              |  62 +++++++----
 .../service/authentication/PlainSASLHelper.scala   |   7 +-
 .../org/apache/kyuubi/KerberizedTestHelper.scala   |  52 +++++-----
 .../apache/kyuubi/operation/JDBCTestUtils.scala    |   4 +-
 .../CustomAuthenticationProviderImplSuite.scala    |   2 +-
 .../KyuubiAuthenticationFactorySuite.scala         |  23 +++--
 .../LdapAuthenticationProviderImplSuite.scala      |  21 +---
 .../service/authentication/WithLdapServer.scala    |  44 ++++++++
 .../apache/kyuubi/ha/client/ServiceDiscovery.scala |   2 +-
 .../org/apache/kyuubi/WithSecuredDFSService.scala  |   5 +-
 .../KyuubiOperationKerberosAndPlainAuthSuite.scala | 114 +++++++++++++++++++++
 13 files changed, 274 insertions(+), 83 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 402660b..f77be0c 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -122,7 +122,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
-kyuubi\.authentication|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Client authentication types.<ul> <li>NOSASL: raw transport.</li> <li>NONE: no authentication check.</li> <li>KERBEROS: Kerberos/GSSAPI authentication.</li> <li>CUSTOM: User-defined authentication.</li> <li>LDAP: Lightweight Directory Access Protocol authentication.</li></ul></div>|<div style='width: 30pt'>string</div>|<di [...]
+kyuubi\.authentication|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of client authentication types.<ul> <li>NOSASL: raw transport.</li> <li>NONE: no authentication check.</li> <li>KERBEROS: Kerberos/GSSAPI authentication.</li> <li>CUSTOM: User-defined authentication.</li> <li>LDAP: Lightweight Directory Access Protocol authentication.</li></ul> Note that: For KERBER [...]
 kyuubi\.authentication<br>\.custom\.class|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>User-defined authentication implementation of org.apache.kyuubi.service.authentication.PasswdAuthenticationProvider</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
 kyuubi\.authentication<br>\.ldap\.base\.dn|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>LDAP base DN.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.authentication<br>\.ldap\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>LDAP domain.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index e035550..f925261 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -370,18 +370,25 @@ object KyuubiConf {
       .version("1.4.0")
       .fallbackConf(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH)
 
-  val AUTHENTICATION_METHOD: ConfigEntry[String] = buildConf("authentication")
-    .doc("Client authentication types.<ul>" +
+  val AUTHENTICATION_METHOD: ConfigEntry[Seq[String]] = buildConf("authentication")
+    .doc("A comma separated list of client authentication types.<ul>" +
       " <li>NOSASL: raw transport.</li>" +
       " <li>NONE: no authentication check.</li>" +
       " <li>KERBEROS: Kerberos/GSSAPI authentication.</li>" +
       " <li>CUSTOM: User-defined authentication.</li>" +
-      " <li>LDAP: Lightweight Directory Access Protocol authentication.</li></ul>")
+      " <li>LDAP: Lightweight Directory Access Protocol authentication.</li></ul>" +
+      " Note that: For KERBEROS, it is SASL/GSSAPI mechanism," +
+      " and for NONE, CUSTOM and LDAP, they are all SASL/PLAIN mechanism." +
+      " If only NOSASL is specified, the authentication will be NOSASL." +
+      " For SASL authentication, KERBEROS and PLAIN auth type are supported at the same time," +
+      " and only the first specified PLAIN auth type is valid.")
     .version("1.0.0")
     .stringConf
-    .transform(_.toUpperCase(Locale.ROOT))
-    .checkValues(AuthTypes.values.map(_.toString))
-    .createWithDefault(AuthTypes.NONE.toString)
+    .toSequence()
+    .transform(_.map(_.toUpperCase(Locale.ROOT)))
+    .checkValue(_.forall(AuthTypes.values.map(_.toString).contains),
+      s"the authentication type should be one or more of ${AuthTypes.values.mkString(",")}")
+    .createWithDefault(Seq(AuthTypes.NONE.toString))
 
   val AUTHENTICATION_CUSTOM_CLASS: OptionalConfigEntry[String] =
     buildConf("authentication.custom.class")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala
index 736b6e7..390491a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala
@@ -27,19 +27,23 @@ import org.apache.hadoop.security.authentication.util.KerberosName
 import org.apache.hadoop.security.authorize.ProxyUsers
 import org.apache.hive.service.rpc.thrift.TCLIService.Iface
 import org.apache.thrift.TProcessorFactory
-import org.apache.thrift.transport.{TTransportException, TTransportFactory}
+import org.apache.thrift.transport.{TSaslServerTransport, TTransportException, TTransportFactory}
 
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.service.authentication.AuthTypes._
 
-class KyuubiAuthenticationFactory(conf: KyuubiConf) {
+class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging {
 
-  private val authType: AuthType = AuthTypes.withName(conf.get(AUTHENTICATION_METHOD))
+  private val authTypes = conf.get(AUTHENTICATION_METHOD).map(AuthTypes.withName)
+  private val noSasl = authTypes == Seq(NOSASL)
+  private val kerberosEnabled = authTypes.contains(KERBEROS)
+  private val plainAuthTypeOpt = authTypes.filterNot(_.equals(KERBEROS))
+    .filterNot(_.equals(NOSASL)).headOption
 
-  private val saslServer: Option[HadoopThriftAuthBridgeServer] = authType match {
-    case KERBEROS =>
+  private val hadoopAuthServer: Option[HadoopThriftAuthBridgeServer] = {
+    if (kerberosEnabled) {
       val secretMgr = KyuubiDelegationTokenManager(conf)
       try {
         secretMgr.startThreads()
@@ -47,7 +51,9 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
         case e: IOException => throw new TTransportException("Failed to start token manager", e)
       }
       Some(new HadoopThriftAuthBridgeServer(secretMgr))
-    case _ => None
+    } else {
+      None
+    }
   }
 
   private def getSaslProperties: java.util.Map[String, String] = {
@@ -59,34 +65,48 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
   }
 
   def getTTransportFactory: TTransportFactory = {
-    saslServer match {
-      case Some(server) =>
-        val serverTransportFactory = try {
-          server.createSaslServerTransportFactory(getSaslProperties)
-        } catch {
-          case e: TTransportException => throw new LoginException(e.getMessage)
-        }
+    if (noSasl) {
+      new TTransportFactory()
+    } else {
+      var transportFactory: TSaslServerTransport.Factory = null
+
+      hadoopAuthServer match {
+        case Some(server) =>
+          transportFactory = try {
+            server.createSaslServerTransportFactory(getSaslProperties)
+          } catch {
+            case e: TTransportException => throw new LoginException(e.getMessage)
+          }
+
+        case _ =>
+      }
 
-        server.wrapTransportFactory(serverTransportFactory)
+      plainAuthTypeOpt match {
+        case Some(plainAuthType) =>
+          transportFactory = PlainSASLHelper.getTransportFactory(plainAuthType.toString, conf,
+            Option(transportFactory)).asInstanceOf[TSaslServerTransport.Factory]
+
+        case _ =>
+      }
 
-      case _ => authType match {
-        case NOSASL => new TTransportFactory
-        case _ => PlainSASLHelper.getTransportFactory(authType.toString, conf)
+      hadoopAuthServer match {
+        case Some(server) => server.wrapTransportFactory(transportFactory)
+        case _ => transportFactory
       }
     }
   }
 
-  def getTProcessorFactory(fe: Iface): TProcessorFactory = saslServer match {
+  def getTProcessorFactory(fe: Iface): TProcessorFactory = hadoopAuthServer match {
     case Some(server) => FEServiceProcessorFactory(server, fe)
     case _ => PlainSASLHelper.getProcessFactory(fe)
   }
 
   def getRemoteUser: Option[String] = {
-    saslServer.map(_.getRemoteUser).orElse(Option(TSetIpAddressProcessor.getUserName))
+    hadoopAuthServer.map(_.getRemoteUser).orElse(Option(TSetIpAddressProcessor.getUserName))
   }
 
   def getIpAddress: Option[String] = {
-    saslServer.map(_.getRemoteAddress).map(_.getHostAddress)
+    hadoopAuthServer.map(_.getRemoteAddress).map(_.getHostAddress)
       .orElse(Option(TSetIpAddressProcessor.getUserIpAddress))
   }
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala
index 07cc501..2c597ec 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala
@@ -71,8 +71,11 @@ object PlainSASLHelper {
     SQLPlainProcessorFactory(service)
   }
 
-  def getTransportFactory(authTypeStr: String, conf: KyuubiConf): TTransportFactory = {
-    val saslFactory = new TSaslServerTransport.Factory()
+  def getTransportFactory(
+      authTypeStr: String,
+      conf: KyuubiConf,
+      transportFactory: Option[TSaslServerTransport.Factory] = None): TTransportFactory = {
+    val saslFactory = transportFactory.getOrElse(new TSaslServerTransport.Factory())
     try {
       val handler = new PlainServerCallbackHandler(authTypeStr, conf)
       val props = new java.util.HashMap[String, String]
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
index d5ba4a0..ea032fd 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
@@ -48,28 +48,35 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
   kdcConf.remove(MiniKdc.DEBUG)
 
   private var kdc: MiniKdc = _
-  private var krb5ConfPath: String = _
-
-  eventually(timeout(60.seconds), interval(1.second)) {
-    try {
-      kdc = new MiniKdc(kdcConf, baseDir)
-      kdc.start()
-      krb5ConfPath = kdc.getKrb5conf.getAbsolutePath
-    } catch {
-      case NonFatal(e) =>
-        if (kdc != null) {
-          kdc.stop()
-          kdc = null
-        }
-        throw e
-    }
-  }
+  protected var krb5ConfPath: String = _
 
   private val keytabFile = new File(baseDir, "kyuubi-test.keytab")
   protected val testKeytab: String = keytabFile.getAbsolutePath
-  protected var testPrincipal = s"client/$hostName"
-  kdc.createPrincipal(keytabFile, testPrincipal)
-
+  protected var testPrincipal: String = _
+
+  override def beforeAll(): Unit = {
+    eventually(timeout(60.seconds), interval(1.second)) {
+      try {
+        kdc = new MiniKdc(kdcConf, baseDir)
+        kdc.start()
+        krb5ConfPath = kdc.getKrb5conf.getAbsolutePath
+      } catch {
+        case NonFatal(e) =>
+          if (kdc != null) {
+            kdc.stop()
+            kdc = null
+          }
+          throw e
+      }
+    }
+    val tempTestPrincipal = s"client/$hostName"
+    kdc.createPrincipal(keytabFile, tempTestPrincipal)
+    rewriteKrb5Conf()
+    testPrincipal = tempTestPrincipal + "@" + kdc.getRealm
+    info(s"KerberizedTest Principal: $testPrincipal")
+    info(s"KerberizedTest Keytab: $testKeytab")
+    super.beforeAll()
+  }
 
   /**
    * Forked from Apache Spark
@@ -113,13 +120,6 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
     System.lineSeparator() + s"    $key=$value"
   }
 
-  rewriteKrb5Conf()
-
-  testPrincipal = testPrincipal + "@" + kdc.getRealm
-
-  info(s"KerberizedTest Principal: $testPrincipal")
-  info(s"KerberizedTest Keytab: $testKeytab")
-
   override def afterAll(): Unit = {
     kdc.stop()
     super.afterAll()
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
index 36b7e60..0abcbaf 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
@@ -63,7 +63,9 @@ trait JDBCTestUtils extends KyuubiFunSuite {
     }
   }
 
-  private def jdbcUrlWithConf: String = {
+  protected def jdbcUrlWithConf: String = jdbcUrlWithConf(jdbcUrl)
+
+  protected def jdbcUrlWithConf(jdbcUrl: String): String = {
     val sessionConfStr = sessionConfigs.map(kv => kv._1 + "=" + kv._2).mkString(";")
     val sparkHiveConfStr = if (sparkHiveConfigs.isEmpty) {
       ""
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/CustomAuthenticationProviderImplSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/CustomAuthenticationProviderImplSuite.scala
index cfd4743..c5561d0 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/CustomAuthenticationProviderImplSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/CustomAuthenticationProviderImplSuite.scala
@@ -33,7 +33,7 @@ class CustomAuthenticationProviderImplSuite extends KyuubiFunSuite {
       "authentication.custom.class must be set when auth method was CUSTOM."))
 
     conf.set(KyuubiConf.AUTHENTICATION_CUSTOM_CLASS,
-      "org.apache.kyuubi.service.authentication.UserDefineAuthenticationProviderImpl")
+      classOf[UserDefineAuthenticationProviderImpl].getCanonicalName)
     val p1 = getAuthenticationProvider(AuthMethods.withName("CUSTOM"), conf)
     val e2 = intercept[AuthenticationException](p1.authenticate("test", "test"))
     assert(e2.getMessage.contains("Username or password is not valid!"))
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactorySuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactorySuite.scala
index 50aa536..2298c96 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactorySuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactorySuite.scala
@@ -20,12 +20,13 @@ package org.apache.kyuubi.service.authentication
 import java.security.Security
 import javax.security.auth.login.LoginException
 
+import org.apache.thrift.transport.TSaslServerTransport
+
 import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.service.authentication.PlainSASLServer.SaslPlainProvider
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
-
 class KyuubiAuthenticationFactorySuite extends KyuubiFunSuite {
   import KyuubiAuthenticationFactory._
 
@@ -55,14 +56,14 @@ class KyuubiAuthenticationFactorySuite extends KyuubiFunSuite {
   }
 
   test("AuthType Other") {
-    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, "INVALID")
+    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("INVALID"))
     val e = intercept[IllegalArgumentException](new KyuubiAuthenticationFactory(conf))
-    assert(e.getMessage === "The value of kyuubi.authentication should be one of" +
-      " CUSTOM, KERBEROS, LDAP, NONE, NOSASL, but was INVALID")
+    assert(e.getMessage === "the authentication type should be one or more of" +
+      " NOSASL,NONE,LDAP,KERBEROS,CUSTOM")
   }
 
   test("AuthType LDAP") {
-    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, "LDAP")
+    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("LDAP"))
     val authFactory = new KyuubiAuthenticationFactory(conf)
     authFactory.getTTransportFactory
     assert(Security.getProviders.exists(_.isInstanceOf[SaslPlainProvider]))
@@ -70,10 +71,20 @@ class KyuubiAuthenticationFactorySuite extends KyuubiFunSuite {
 
 
   test("AuthType KERBEROS w/o keytab/principal") {
-    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, "KERBEROS")
+    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS"))
 
     val factory = new KyuubiAuthenticationFactory(conf)
     val e = intercept[LoginException](factory.getTTransportFactory)
     assert(e.getMessage startsWith "Kerberos principal should have 3 parts")
   }
+
+  test("AuthType is NOSASL if only NOSASL is specified") {
+    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("NOSASL"))
+    var factory = new KyuubiAuthenticationFactory(conf)
+    !factory.getTTransportFactory.isInstanceOf[TSaslServerTransport.Factory]
+
+    conf.set(KyuubiConf.AUTHENTICATION_METHOD, Seq("NOSASL", "NONE"))
+    factory = new KyuubiAuthenticationFactory(conf)
+    factory.getTTransportFactory.isInstanceOf[TSaslServerTransport.Factory]
+  }
 }
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/LdapAuthenticationProviderImplSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/LdapAuthenticationProviderImplSuite.scala
index 98d2069..2563dc0 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/LdapAuthenticationProviderImplSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/LdapAuthenticationProviderImplSuite.scala
@@ -20,28 +20,18 @@ package org.apache.kyuubi.service.authentication
 import javax.naming.CommunicationException
 import javax.security.sasl.AuthenticationException
 
-import com.unboundid.ldap.listener.InMemoryDirectoryServer
-import com.unboundid.ldap.listener.InMemoryDirectoryServerConfig
-
-import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 
-class LdapAuthenticationProviderImplSuite extends KyuubiFunSuite {
-
-  private var ldapServer: InMemoryDirectoryServer = _
+class LdapAuthenticationProviderImplSuite extends WithLdapServer {
+  override protected val ldapUser: String = "kentyao"
+  override protected val ldapUserPasswd: String = "kentyao"
 
   private val conf = new KyuubiConf()
 
   override def beforeAll(): Unit = {
-    val config = new InMemoryDirectoryServerConfig("ou=users")
-    config.addAdditionalBindCredentials("uid=kentyao,ou=users", "kentyao")
-    ldapServer = new InMemoryDirectoryServer(config)
-    ldapServer.startListening()
-
-    conf.set(AUTHENTICATION_LDAP_URL, s"ldap://localhost:" + ldapServer.getListenPort)
-
     super.beforeAll()
+    conf.set(AUTHENTICATION_LDAP_URL, ldapUrl)
   }
 
   override def afterAll(): Unit = {
@@ -67,8 +57,7 @@ class LdapAuthenticationProviderImplSuite extends KyuubiFunSuite {
     assert(e3.getMessage.contains(user))
     assert(e3.getCause.isInstanceOf[javax.naming.AuthenticationException])
 
-    val dn = "ou=users"
-    conf.set(AUTHENTICATION_LDAP_BASEDN, dn)
+    conf.set(AUTHENTICATION_LDAP_BASEDN, ldapBaseDn)
     val providerImpl2 = new LdapAuthenticationProviderImpl(conf)
     providerImpl2.authenticate("kentyao", "kentyao")
 
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/WithLdapServer.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/WithLdapServer.scala
new file mode 100644
index 0000000..0bb3868
--- /dev/null
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/WithLdapServer.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.service.authentication
+
+import com.unboundid.ldap.listener.{InMemoryDirectoryServer, InMemoryDirectoryServerConfig}
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+
+trait WithLdapServer extends KyuubiFunSuite {
+  protected var ldapServer: InMemoryDirectoryServer = _
+  protected val ldapBaseDn = "ou=users"
+  protected val ldapUser = Utils.currentUser
+  protected val ldapUserPasswd = "ldapPassword"
+
+  protected def ldapUrl = s"ldap://localhost:${ldapServer.getListenPort}"
+
+  override def beforeAll(): Unit = {
+    val config = new InMemoryDirectoryServerConfig(ldapBaseDn)
+    config.addAdditionalBindCredentials(s"uid=$ldapUser,ou=users", ldapUserPasswd)
+    ldapServer = new InMemoryDirectoryServer(config)
+    ldapServer.startListening()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    ldapServer.close()
+    super.afterAll()
+  }
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 06335f1..6f7483a 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -283,7 +283,7 @@ object ServiceDiscovery extends Logging {
     confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
     confsToPublish += ("hive.server2.thrift.sasl.qop" -> conf.get(KyuubiConf.SASL_QOP))
     // Auth specific confs
-    val authenticationMethod = conf.get(KyuubiConf.AUTHENTICATION_METHOD)
+    val authenticationMethod = conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
     confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
     if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
       confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
index 797232f..0bbbc3b 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
@@ -25,7 +25,7 @@ import org.apache.kyuubi.server.MiniDFSService
 
 trait WithSecuredDFSService extends KerberizedTestHelper {
 
-  private val miniDFSService = new MiniDFSService(newSecuredConf())
+  private var miniDFSService: MiniDFSService = _
 
   private def newSecuredConf(): Configuration = {
     val hdfsConf = new Configuration()
@@ -45,12 +45,13 @@ trait WithSecuredDFSService extends KerberizedTestHelper {
   }
 
   override def beforeAll(): Unit = {
+    super.beforeAll()
     tryWithSecurityEnabled {
       UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+      miniDFSService = new MiniDFSService(newSecuredConf())
       miniDFSService.initialize(new KyuubiConf(false))
       miniDFSService.start()
     }
-    super.beforeAll()
   }
 
   override def afterAll(): Unit = {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala
new file mode 100644
index 0000000..2e94d21
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import java.sql.{DriverManager, SQLException}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.kyuubi.{KerberizedTestHelper, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.authentication.{UserDefineAuthenticationProviderImpl, WithLdapServer}
+
+class KyuubiOperationKerberosAndPlainAuthSuite extends
+  WithKyuubiServer with KerberizedTestHelper with WithLdapServer with JDBCTestUtils {
+  private val customUser: String = "user"
+  private val customPasswd: String = "password"
+
+  override protected def jdbcUrl: String = getJdbcUrl
+  private def kerberosJdbcUrl: String = jdbcUrl + s"principal=${testPrincipal}"
+  private val currentUser = UserGroupInformation.getCurrentUser
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    System.clearProperty("java.security.krb5.conf")
+    UserGroupInformation.setLoginUser(currentUser)
+    UserGroupInformation.setConfiguration(new Configuration())
+    assert(!UserGroupInformation.isSecurityEnabled)
+    super.afterAll()
+  }
+
+  override protected lazy val conf: KyuubiConf = {
+    val config = new Configuration()
+    val authType = "hadoop.security.authentication"
+    config.set(authType, "KERBEROS")
+    System.setProperty("java.security.krb5.conf", krb5ConfPath)
+    UserGroupInformation.setConfiguration(config)
+    assert(UserGroupInformation.isSecurityEnabled)
+
+    KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS", "LDAP", "CUSTOM"))
+      .set(KyuubiConf.SERVER_KEYTAB, testKeytab)
+      .set(KyuubiConf.SERVER_PRINCIPAL, testPrincipal)
+      .set(KyuubiConf.AUTHENTICATION_LDAP_URL, ldapUrl)
+      .set(KyuubiConf.AUTHENTICATION_LDAP_BASEDN, ldapBaseDn)
+      .set(KyuubiConf.AUTHENTICATION_CUSTOM_CLASS,
+        classOf[UserDefineAuthenticationProviderImpl].getCanonicalName)
+  }
+
+  test("test with KERBEROS authentication") {
+    val conn = DriverManager.getConnection(jdbcUrlWithConf(kerberosJdbcUrl), user, "")
+    try {
+      val statement = conn.createStatement()
+      val resultSet = statement.executeQuery("select engine_name()")
+      assert(resultSet.next())
+      assert(resultSet.getString(1).nonEmpty)
+    } finally {
+      conn.close()
+    }
+  }
+
+  test("test with LDAP authentication") {
+    val conn = DriverManager.getConnection(jdbcUrlWithConf, ldapUser, ldapUserPasswd)
+    try {
+      val statement = conn.createStatement()
+      val resultSet = statement.executeQuery("select engine_name()")
+      assert(resultSet.next())
+      assert(resultSet.getString(1).nonEmpty)
+    } finally {
+      conn.close()
+    }
+  }
+
+  test("only the first specified plain auth type is valid") {
+    intercept[SQLException] {
+      val conn = DriverManager.getConnection(jdbcUrlWithConf, customUser, customPasswd)
+      try {
+        val statement = conn.createStatement()
+        statement.executeQuery("select engine_name()")
+      } finally {
+        conn.close()
+      }
+    }
+  }
+
+  test("test with invalid password") {
+    intercept[SQLException] {
+      val conn = DriverManager.getConnection(jdbcUrlWithConf, user, "invalidPassword")
+      try {
+        val statement = conn.createStatement()
+        statement.executeQuery("select engine_name()")
+      } finally {
+        conn.close()
+      }
+    }
+  }
+}