You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/12/09 10:39:36 UTC

[incubator-kyuubi] 01/01: Add engine conf provide

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch KYUUBI-1536
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git

commit 27087ee8a0a06ce5e7f2b6f2ee0e58cf26d6b481
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Dec 9 18:39:21 2021 +0800

     Add engine conf provide
---
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 16 ++++++++++
 .../org/apache/kyuubi/config/KyuubiConfSuite.scala |  9 ++++++
 .../apache/kyuubi/engine/EngineConfProvider.scala  | 35 ++++++++++++++++++++
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  5 ++-
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |  2 ++
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  |  5 +--
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |  2 +-
 .../kyuubi/session/KyuubiSessionManager.scala      |  8 +++++
 .../org/apache/kyuubi/engine/EngineRefSuite.scala  | 33 +++++++++++--------
 .../engine/spark/SparkProcessBuilderSuite.scala    | 37 ++++++++++++----------
 10 files changed, 119 insertions(+), 33 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ef9fbd6..8096083 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1064,4 +1064,20 @@ object KyuubiConf {
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(OperationLanguages.values.map(_.toString))
       .createWithDefault(OperationLanguages.SQL.toString)
+
+  object EngineConfProvider extends Enumeration {
+    type OperationLanguage = Value
+    val DEFAULT = Value
+  }
+
+  val ENGINE_CONFIG_PROVIDER: ConfigEntry[String] =
+    buildConf("engine.config.provider")
+      .doc("Choose a config provider for engine, the default implementation is from " +
+        "session config.")
+      .version("1.5.0")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValue(EngineConfProvider.values.map(_.toString).contains(_),
+        "should be DEFAULT")
+      .createWithDefault(EngineConfProvider.DEFAULT.toString)
 }
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
index 47025c7..cb42e7c 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
@@ -163,4 +163,13 @@ class KyuubiConfSuite extends KyuubiFunSuite {
     kyuubiConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, path)
     assert(kyuubiConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).get == path)
   }
+
+  test("vaild kyuubi.engine.config.provider") {
+    val kyuubiConf = KyuubiConf()
+    assert(kyuubiConf.get(ENGINE_CONFIG_PROVIDER) == EngineConfProvider.DEFAULT.toString)
+    kyuubiConf.set(ENGINE_CONFIG_PROVIDER, "default")
+    assert(kyuubiConf.get(ENGINE_CONFIG_PROVIDER) == EngineConfProvider.DEFAULT.toString)
+    kyuubiConf.set(ENGINE_CONFIG_PROVIDER, "default1")
+    assertThrows[IllegalArgumentException](kyuubiConf.get(ENGINE_CONFIG_PROVIDER))
+  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineConfProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineConfProvider.scala
new file mode 100644
index 0000000..d21c5c3
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineConfProvider.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.AbstractService
+
+/**
+ * Provide the engine configuration according to the user and session configuration.
+ */
+trait EngineConfProvider extends AbstractService {
+  def buildConf(user: String, sessionConf: KyuubiConf): Map[String, String]
+}
+
+class DefaultEngineConfProvider extends EngineConfProvider {
+  override def buildConf(user: String, sessionConf: KyuubiConf): Map[String, String] = {
+    assert(sessionConf != null)
+    sessionConf.getAll
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 1c2ca19..14c6e62 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -48,13 +48,16 @@ import org.apache.kyuubi.operation.log.OperationLog
  *
  * @param conf Engine configuration
  * @param user Caller of the engine
+ * @param engineConfProvider the engine conf provider
  * @param engineRefId Id of the corresponding session in which the engine is created
  */
 private[kyuubi] class EngineRef(
     conf: KyuubiConf,
     user: String,
+    engineConfProvider: EngineConfProvider,
     engineRefId: String = UUID.randomUUID().toString)
   extends Logging {
+
   // The corresponding ServerSpace where the engine belongs to
   private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
 
@@ -181,7 +184,7 @@ private[kyuubi] class EngineRef(
         conf.set(
           SparkProcessBuilder.TAG_KEY,
           conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
-        new SparkProcessBuilder(appUser, conf, extraEngineLog)
+        new SparkProcessBuilder(appUser, conf, engineConfProvider, extraEngineLog)
       case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
     }
     MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index bf1ca92..131909b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -53,6 +53,8 @@ trait ProcBuilder {
 
   protected def env: Map[String, String] = conf.getEnvs
 
+  protected val engineConfProvider: EngineConfProvider
+
   protected val extraEngineLog: Option[OperationLog]
 
   protected val workingDir: Path
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index b78a5ad..ba14245 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
-import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.{EngineConfProvider, ProcBuilder}
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
 import org.apache.kyuubi.operation.log.OperationLog
@@ -36,6 +36,7 @@ import org.apache.kyuubi.operation.log.OperationLog
 class SparkProcessBuilder(
     override val proxyUser: String,
     override val conf: KyuubiConf,
+    val engineConfProvider: EngineConfProvider,
     val extraEngineLog: Option[OperationLog] = None)
   extends ProcBuilder with Logging {
 
@@ -130,7 +131,7 @@ class SparkProcessBuilder(
     buffer += CLASS
     buffer += mainClass
 
-    var allConf = conf.getAll
+    var allConf = engineConfProvider.buildConf(proxyUser, conf)
 
     // if enable sasl kerberos authentication for zookeeper, need to upload the server ketab file
     if (ZooKeeperAuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 60d331b..3ad9b9e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -50,7 +50,7 @@ class KyuubiSessionImpl(
     case (key, value) => sessionConf.set(key, value)
   }
 
-  val engine: EngineRef = new EngineRef(sessionConf, user)
+  val engine: EngineRef = new EngineRef(sessionConf, user, sessionManager.engineConfProvider)
   private[kyuubi] val launchEngineOp = sessionManager.operationManager
     .newLaunchEngineOperation(this, sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 99bd9d6..fd8ce51 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -24,6 +24,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.credentials.HadoopCredentialsManager
+import org.apache.kyuubi.engine.{DefaultEngineConfProvider, EngineConfProvider}
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -34,8 +35,15 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
 
   val operationManager = new KyuubiOperationManager()
   val credentialsManager = new HadoopCredentialsManager()
+  var engineConfProvider: EngineConfProvider = _
 
   override def initialize(conf: KyuubiConf): Unit = {
+    val provider = KyuubiConf.EngineConfProvider.withName(
+      conf.get(KyuubiConf.ENGINE_CONFIG_PROVIDER))
+    engineConfProvider = provider match {
+      case KyuubiConf.EngineConfProvider.DEFAULT => new DefaultEngineConfProvider()
+    }
+    addService(engineConfProvider)
     addService(credentialsManager)
     val absPath = Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
     _operationLogRoot = Some(absPath.toAbsolutePath.toString)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 7a3729b..4a9bdac 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -59,13 +59,20 @@ class EngineRefSuite extends KyuubiFunSuite {
     super.beforeEach()
   }
 
+  private def newEngineRef(
+      conf: KyuubiConf,
+      user: String,
+      engineRefId: String = UUID.randomUUID().toString): EngineRef = {
+    new EngineRef(conf, user, new DefaultEngineConfProvider, engineRefId)
+  }
+
   test("CONNECTION shared level engine name") {
     val id = UUID.randomUUID().toString
     val engineType = conf.get(KyuubiConf.ENGINE_TYPE)
     Seq(None, Some("suffix")).foreach { domain =>
       conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
       domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
-      val engine = new EngineRef(conf, user, id)
+      val engine = newEngineRef(conf, user, id)
       assert(engine.engineSpace ===
         ZKPaths.makePath(s"kyuubi_${CONNECTION}_${engineType}", user, id))
       assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${engineType}_${user}_$id")
@@ -76,7 +83,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
     conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
-    val appName = new EngineRef(conf, user, id)
+    val appName = newEngineRef(conf, user, id)
     assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_${USER}_$FLINK_SQL", user, "default"))
     assert(appName.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_default_$id")
 
@@ -95,7 +102,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
     conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
-    val engineRef = new EngineRef(conf, user, id)
+    val engineRef = newEngineRef(conf, user, id)
     val primaryGroupName = UserGroupInformation.createRemoteUser(user).getPrimaryGroupName
     assert(engineRef.engineSpace ===
       ZKPaths.makePath(s"kyuubi_GROUP_SPARK_SQL", primaryGroupName, "default"))
@@ -106,7 +113,7 @@ class EngineRefSuite extends KyuubiFunSuite {
       k =>
         conf.unset(k)
         conf.set(k.key, "abc")
-        val engineRef2 = new EngineRef(conf, user, id)
+        val engineRef2 = newEngineRef(conf, user, id)
         assert(engineRef2.engineSpace ===
           ZKPaths.makePath(s"kyuubi_${GROUP}_${SPARK_SQL}", primaryGroupName, "abc"))
         assert(engineRef2.defaultEngineName ===
@@ -116,7 +123,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val userName = "Iamauserwithoutgroup"
     val newUGI = UserGroupInformation.createRemoteUser(userName)
     assert(newUGI.getGroupNames.isEmpty)
-    val engineRef3 = new EngineRef(conf, userName, id)
+    val engineRef3 = newEngineRef(conf, userName, id)
     assert(engineRef3.engineSpace === ZKPaths.makePath(s"kyuubi_GROUP_SPARK_SQL", userName, "abc"))
     assert(engineRef3.defaultEngineName === s"kyuubi_GROUP_SPARK_SQL_${userName}_abc_$id")
   }
@@ -125,13 +132,13 @@ class EngineRefSuite extends KyuubiFunSuite {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
     conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
-    val appName = new EngineRef(conf, user, id)
+    val appName = newEngineRef(conf, user, id)
     assert(appName.engineSpace ===
       ZKPaths.makePath(s"kyuubi_${SERVER}_${FLINK_SQL}", user, "default"))
     assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id")
 
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
-    val appName2 = new EngineRef(conf, user, id)
+    val appName2 = newEngineRef(conf, user, id)
     assert(appName2.engineSpace ===
       ZKPaths.makePath(s"kyuubi_${SERVER}_${FLINK_SQL}", user, "abc"))
     assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_abc_$id")
@@ -143,31 +150,31 @@ class EngineRefSuite extends KyuubiFunSuite {
     // set subdomain and disable engine pool
     conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
     conf.set(ENGINE_POOL_SIZE, -1)
-    val engine1 = new EngineRef(conf, user, id)
+    val engine1 = newEngineRef(conf, user, id)
     assert(engine1.subdomain === "abc")
 
     // unset subdomain and disable engine pool
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, -1)
-    val engine2 = new EngineRef(conf, user, id)
+    val engine2 = newEngineRef(conf, user, id)
     assert(engine2.subdomain === "default")
 
     // set subdomain and 1 <= engine pool size < threshold
     conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
     conf.set(ENGINE_POOL_SIZE, 1)
-    val engine3 = new EngineRef(conf, user, id)
+    val engine3 = newEngineRef(conf, user, id)
     assert(engine3.subdomain === "abc")
 
     // unset subdomain and 1 <= engine pool size < threshold
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, 3)
-    val engine4 = new EngineRef(conf, user, id)
+    val engine4 = newEngineRef(conf, user, id)
     assert(engine4.subdomain.startsWith("engine-pool-"))
 
     // unset subdomain and engine pool size > threshold
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, 100)
-    val engine5 = new EngineRef(conf, user, id)
+    val engine5 = newEngineRef(conf, user, id)
     val engineNumber = Integer.parseInt(engine5.subdomain.substring(12))
     val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
     assert(engineNumber <= threshold)
@@ -180,7 +187,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
     conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
     conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
-    val engine = new EngineRef(conf, user, id)
+    val engine = newEngineRef(conf, user, id)
 
     var port1 = 0
     var port2 = 0
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index d50b75a..2d511d5 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -28,6 +28,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_LOG_TIMEOUT
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
+import org.apache.kyuubi.engine.DefaultEngineConfProvider
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
 import org.apache.kyuubi.service.ServiceUtils
@@ -35,8 +36,12 @@ import org.apache.kyuubi.service.ServiceUtils
 class SparkProcessBuilderSuite extends KerberizedTestHelper {
   private def conf = KyuubiConf().set("kyuubi.on", "off")
 
+  private def newSparkProcessBuilder(user: String, conf: KyuubiConf): SparkProcessBuilder = {
+    new SparkProcessBuilder(user, conf, new DefaultEngineConfProvider())
+  }
+
   test("spark process builder") {
-    val builder = new SparkProcessBuilder("kentyao", conf)
+    val builder = newSparkProcessBuilder("kentyao", conf)
     val commands = builder.toString.split(' ')
     assert(commands(2) === "org.apache.kyuubi.engine.spark.SparkSQLEngine")
     assert(commands.contains("spark.kyuubi.on=off"))
@@ -52,7 +57,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
   }
 
   test("capture error from spark process builder") {
-    val processBuilder = new SparkProcessBuilder("kentyao", conf.set("spark.ui.port", "abc"))
+    val processBuilder = newSparkProcessBuilder("kentyao", conf.set("spark.ui.port", "abc"))
     processBuilder.start
     eventually(timeout(90.seconds), interval(500.milliseconds)) {
       val error = processBuilder.getError
@@ -62,7 +67,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
     }
 
     val processBuilder1 =
-      new SparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
+      newSparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
 
     processBuilder1.start
     eventually(timeout(90.seconds), interval(500.milliseconds)) {
@@ -75,7 +80,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
   test("engine log truncation") {
     val msg = "org.apache.spark.sql.hive."
     val pb =
-      new SparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
+      newSparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
     pb.start
     eventually(timeout(90.seconds), interval(500.milliseconds)) {
       val error1 = pb.getError
@@ -84,7 +89,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
       assert(error1.getMessage.contains(msg))
     }
 
-    val pb2 = new SparkProcessBuilder(
+    val pb2 = newSparkProcessBuilder(
       "kentyao",
       conf.set("spark.hive.metastore.uris", "thrift://dummy")
         .set(KyuubiConf.ENGINE_ERROR_MAX_SIZE, 200))
@@ -98,29 +103,29 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
   }
 
   test("proxy user or keytab") {
-    val b1 = new SparkProcessBuilder("kentyao", conf)
+    val b1 = newSparkProcessBuilder("kentyao", conf)
     assert(b1.toString.contains("--proxy-user kentyao"))
 
     val conf1 = conf.set("spark.kerberos.principal", testPrincipal)
-    val b2 = new SparkProcessBuilder("kentyao", conf1)
+    val b2 = newSparkProcessBuilder("kentyao", conf1)
     assert(b2.toString.contains("--proxy-user kentyao"))
 
     val conf2 = conf.set("spark.kerberos.keytab", testKeytab)
-    val b3 = new SparkProcessBuilder("kentyao", conf2)
+    val b3 = newSparkProcessBuilder("kentyao", conf2)
     assert(b3.toString.contains("--proxy-user kentyao"))
 
     tryWithSecurityEnabled {
       val conf3 = conf.set("spark.kerberos.principal", testPrincipal)
         .set("spark.kerberos.keytab", "testKeytab")
-      val b4 = new SparkProcessBuilder(Utils.currentUser, conf3)
+      val b4 = newSparkProcessBuilder(Utils.currentUser, conf3)
       assert(b4.toString.contains(s"--proxy-user ${Utils.currentUser}"))
 
       val conf4 = conf.set("spark.kerberos.principal", testPrincipal)
         .set("spark.kerberos.keytab", testKeytab)
-      val b5 = new SparkProcessBuilder("kentyao", conf4)
+      val b5 = newSparkProcessBuilder("kentyao", conf4)
       assert(b5.toString.contains("--proxy-user kentyao"))
 
-      val b6 = new SparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), conf4)
+      val b6 = newSparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), conf4)
       assert(!b6.toString.contains("--proxy-user kentyao"))
     }
   }
@@ -218,13 +223,13 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
 
     val conf = KyuubiConf()
     conf.set(ENGINE_SPARK_MAIN_RESOURCE, hdfsPath)
-    val b1 = new SparkProcessBuilder("test", conf)
+    val b1 = newSparkProcessBuilder("test", conf)
     assert(b1.mainResource.get.startsWith("hdfs://"))
     assert(b1.mainResource.get == hdfsPath)
 
     // user specified jar not exist, get default jar and expect not equals
     conf.set(ENGINE_SPARK_MAIN_RESOURCE, jarPath.toString)
-    val b2 = new SparkProcessBuilder("test", conf)
+    val b2 = newSparkProcessBuilder("test", conf)
     assert(b2.mainResource.getOrElse("") != jarPath.toString)
   }
 
@@ -254,7 +259,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
     conf.set("spark.vino", "yang")
     conf.set("kent", "yao")
     conf.set("hadoop.kent", "yao")
-    val builder = new SparkProcessBuilder("", conf)
+    val builder = newSparkProcessBuilder("", conf)
     val commands = builder.toString.split(' ')
     assert(commands.contains("spark.kyuubi.kent=yao"))
     assert(commands.contains("spark.vino=yang"))
@@ -268,13 +273,13 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
     conf.set(HighAvailabilityConf.HA_ZK_AUTH_KEYTAB.key, testKeytab)
     conf.set(HighAvailabilityConf.HA_ZK_AUTH_PRINCIPAL.key, testPrincipal)
 
-    val b1 = new SparkProcessBuilder("test", conf)
+    val b1 = newSparkProcessBuilder("test", conf)
     assert(b1.toString.contains(s"--conf spark.files=$testKeytab"))
 
   }
 }
 
 class FakeSparkProcessBuilder(config: KyuubiConf)
-  extends SparkProcessBuilder("fake", config) {
+  extends SparkProcessBuilder("fake", config, new DefaultEngineConfProvider()) {
   override protected def commands: Array[String] = Array("ls")
 }