You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/12/09 10:39:36 UTC
[incubator-kyuubi] 01/01: Add engine conf provide
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch KYUUBI-1536
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit 27087ee8a0a06ce5e7f2b6f2ee0e58cf26d6b481
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Dec 9 18:39:21 2021 +0800
Add engine conf provide
---
.../org/apache/kyuubi/config/KyuubiConf.scala | 16 ++++++++++
.../org/apache/kyuubi/config/KyuubiConfSuite.scala | 9 ++++++
.../apache/kyuubi/engine/EngineConfProvider.scala | 35 ++++++++++++++++++++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 5 ++-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 2 ++
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 5 +--
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 2 +-
.../kyuubi/session/KyuubiSessionManager.scala | 8 +++++
.../org/apache/kyuubi/engine/EngineRefSuite.scala | 33 +++++++++++--------
.../engine/spark/SparkProcessBuilderSuite.scala | 37 ++++++++++++----------
10 files changed, 119 insertions(+), 33 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ef9fbd6..8096083 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1064,4 +1064,20 @@ object KyuubiConf {
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(OperationLanguages.values.map(_.toString))
.createWithDefault(OperationLanguages.SQL.toString)
+
+ object EngineConfProvider extends Enumeration {
+ type OperationLanguage = Value
+ val DEFAULT = Value
+ }
+
+ val ENGINE_CONFIG_PROVIDER: ConfigEntry[String] =
+ buildConf("engine.config.provider")
+ .doc("Choose a config provider for engine, the default implementation is from " +
+ "session config.")
+ .version("1.5.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValue(EngineConfProvider.values.map(_.toString).contains(_),
+ "should be DEFAULT")
+ .createWithDefault(EngineConfProvider.DEFAULT.toString)
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
index 47025c7..cb42e7c 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
@@ -163,4 +163,13 @@ class KyuubiConfSuite extends KyuubiFunSuite {
kyuubiConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, path)
assert(kyuubiConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).get == path)
}
+
+ test("vaild kyuubi.engine.config.provider") {
+ val kyuubiConf = KyuubiConf()
+ assert(kyuubiConf.get(ENGINE_CONFIG_PROVIDER) == EngineConfProvider.DEFAULT.toString)
+ kyuubiConf.set(ENGINE_CONFIG_PROVIDER, "default")
+ assert(kyuubiConf.get(ENGINE_CONFIG_PROVIDER) == EngineConfProvider.DEFAULT.toString)
+ kyuubiConf.set(ENGINE_CONFIG_PROVIDER, "default1")
+ assertThrows[IllegalArgumentException](kyuubiConf.get(ENGINE_CONFIG_PROVIDER))
+ }
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineConfProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineConfProvider.scala
new file mode 100644
index 0000000..d21c5c3
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineConfProvider.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.AbstractService
+
+/**
+ * Provide the engine configuration according to the user and session configuration.
+ */
+trait EngineConfProvider extends AbstractService {
+ def buildConf(user: String, sessionConf: KyuubiConf): Map[String, String]
+}
+
+class DefaultEngineConfProvider extends EngineConfProvider {
+ override def buildConf(user: String, sessionConf: KyuubiConf): Map[String, String] = {
+ assert(sessionConf != null)
+ sessionConf.getAll
+ }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 1c2ca19..14c6e62 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -48,13 +48,16 @@ import org.apache.kyuubi.operation.log.OperationLog
*
* @param conf Engine configuration
* @param user Caller of the engine
+ * @param engineConfProvider the engine conf provider
* @param engineRefId Id of the corresponding session in which the engine is created
*/
private[kyuubi] class EngineRef(
conf: KyuubiConf,
user: String,
+ engineConfProvider: EngineConfProvider,
engineRefId: String = UUID.randomUUID().toString)
extends Logging {
+
// The corresponding ServerSpace where the engine belongs to
private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
@@ -181,7 +184,7 @@ private[kyuubi] class EngineRef(
conf.set(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
- new SparkProcessBuilder(appUser, conf, extraEngineLog)
+ new SparkProcessBuilder(appUser, conf, engineConfProvider, extraEngineLog)
case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index bf1ca92..131909b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -53,6 +53,8 @@ trait ProcBuilder {
protected def env: Map[String, String] = conf.getEnvs
+ protected val engineConfProvider: EngineConfProvider
+
protected val extraEngineLog: Option[OperationLog]
protected val workingDir: Path
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index b78a5ad..ba14245 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
-import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.{EngineConfProvider, ProcBuilder}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
import org.apache.kyuubi.operation.log.OperationLog
@@ -36,6 +36,7 @@ import org.apache.kyuubi.operation.log.OperationLog
class SparkProcessBuilder(
override val proxyUser: String,
override val conf: KyuubiConf,
+ val engineConfProvider: EngineConfProvider,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
@@ -130,7 +131,7 @@ class SparkProcessBuilder(
buffer += CLASS
buffer += mainClass
- var allConf = conf.getAll
+ var allConf = engineConfProvider.buildConf(proxyUser, conf)
// if enable sasl kerberos authentication for zookeeper, need to upload the server ketab file
if (ZooKeeperAuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 60d331b..3ad9b9e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -50,7 +50,7 @@ class KyuubiSessionImpl(
case (key, value) => sessionConf.set(key, value)
}
- val engine: EngineRef = new EngineRef(sessionConf, user)
+ val engine: EngineRef = new EngineRef(sessionConf, user, sessionManager.engineConfProvider)
private[kyuubi] val launchEngineOp = sessionManager.operationManager
.newLaunchEngineOperation(this, sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 99bd9d6..fd8ce51 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -24,6 +24,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.credentials.HadoopCredentialsManager
+import org.apache.kyuubi.engine.{DefaultEngineConfProvider, EngineConfProvider}
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -34,8 +35,15 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
val operationManager = new KyuubiOperationManager()
val credentialsManager = new HadoopCredentialsManager()
+ var engineConfProvider: EngineConfProvider = _
override def initialize(conf: KyuubiConf): Unit = {
+ val provider = KyuubiConf.EngineConfProvider.withName(
+ conf.get(KyuubiConf.ENGINE_CONFIG_PROVIDER))
+ engineConfProvider = provider match {
+ case KyuubiConf.EngineConfProvider.DEFAULT => new DefaultEngineConfProvider()
+ }
+ addService(engineConfProvider)
addService(credentialsManager)
val absPath = Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
_operationLogRoot = Some(absPath.toAbsolutePath.toString)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 7a3729b..4a9bdac 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -59,13 +59,20 @@ class EngineRefSuite extends KyuubiFunSuite {
super.beforeEach()
}
+ private def newEngineRef(
+ conf: KyuubiConf,
+ user: String,
+ engineRefId: String = UUID.randomUUID().toString): EngineRef = {
+ new EngineRef(conf, user, new DefaultEngineConfProvider, engineRefId)
+ }
+
test("CONNECTION shared level engine name") {
val id = UUID.randomUUID().toString
val engineType = conf.get(KyuubiConf.ENGINE_TYPE)
Seq(None, Some("suffix")).foreach { domain =>
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
- val engine = new EngineRef(conf, user, id)
+ val engine = newEngineRef(conf, user, id)
assert(engine.engineSpace ===
ZKPaths.makePath(s"kyuubi_${CONNECTION}_${engineType}", user, id))
assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${engineType}_${user}_$id")
@@ -76,7 +83,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
- val appName = new EngineRef(conf, user, id)
+ val appName = newEngineRef(conf, user, id)
assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_${USER}_$FLINK_SQL", user, "default"))
assert(appName.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_default_$id")
@@ -95,7 +102,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
- val engineRef = new EngineRef(conf, user, id)
+ val engineRef = newEngineRef(conf, user, id)
val primaryGroupName = UserGroupInformation.createRemoteUser(user).getPrimaryGroupName
assert(engineRef.engineSpace ===
ZKPaths.makePath(s"kyuubi_GROUP_SPARK_SQL", primaryGroupName, "default"))
@@ -106,7 +113,7 @@ class EngineRefSuite extends KyuubiFunSuite {
k =>
conf.unset(k)
conf.set(k.key, "abc")
- val engineRef2 = new EngineRef(conf, user, id)
+ val engineRef2 = newEngineRef(conf, user, id)
assert(engineRef2.engineSpace ===
ZKPaths.makePath(s"kyuubi_${GROUP}_${SPARK_SQL}", primaryGroupName, "abc"))
assert(engineRef2.defaultEngineName ===
@@ -116,7 +123,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val userName = "Iamauserwithoutgroup"
val newUGI = UserGroupInformation.createRemoteUser(userName)
assert(newUGI.getGroupNames.isEmpty)
- val engineRef3 = new EngineRef(conf, userName, id)
+ val engineRef3 = newEngineRef(conf, userName, id)
assert(engineRef3.engineSpace === ZKPaths.makePath(s"kyuubi_GROUP_SPARK_SQL", userName, "abc"))
assert(engineRef3.defaultEngineName === s"kyuubi_GROUP_SPARK_SQL_${userName}_abc_$id")
}
@@ -125,13 +132,13 @@ class EngineRefSuite extends KyuubiFunSuite {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
- val appName = new EngineRef(conf, user, id)
+ val appName = newEngineRef(conf, user, id)
assert(appName.engineSpace ===
ZKPaths.makePath(s"kyuubi_${SERVER}_${FLINK_SQL}", user, "default"))
assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id")
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
- val appName2 = new EngineRef(conf, user, id)
+ val appName2 = newEngineRef(conf, user, id)
assert(appName2.engineSpace ===
ZKPaths.makePath(s"kyuubi_${SERVER}_${FLINK_SQL}", user, "abc"))
assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_abc_$id")
@@ -143,31 +150,31 @@ class EngineRefSuite extends KyuubiFunSuite {
// set subdomain and disable engine pool
conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
conf.set(ENGINE_POOL_SIZE, -1)
- val engine1 = new EngineRef(conf, user, id)
+ val engine1 = newEngineRef(conf, user, id)
assert(engine1.subdomain === "abc")
// unset subdomain and disable engine pool
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, -1)
- val engine2 = new EngineRef(conf, user, id)
+ val engine2 = newEngineRef(conf, user, id)
assert(engine2.subdomain === "default")
// set subdomain and 1 <= engine pool size < threshold
conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
conf.set(ENGINE_POOL_SIZE, 1)
- val engine3 = new EngineRef(conf, user, id)
+ val engine3 = newEngineRef(conf, user, id)
assert(engine3.subdomain === "abc")
// unset subdomain and 1 <= engine pool size < threshold
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, 3)
- val engine4 = new EngineRef(conf, user, id)
+ val engine4 = newEngineRef(conf, user, id)
assert(engine4.subdomain.startsWith("engine-pool-"))
// unset subdomain and engine pool size > threshold
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, 100)
- val engine5 = new EngineRef(conf, user, id)
+ val engine5 = newEngineRef(conf, user, id)
val engineNumber = Integer.parseInt(engine5.subdomain.substring(12))
val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
assert(engineNumber <= threshold)
@@ -180,7 +187,7 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
- val engine = new EngineRef(conf, user, id)
+ val engine = newEngineRef(conf, user, id)
var port1 = 0
var port2 = 0
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index d50b75a..2d511d5 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -28,6 +28,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_LOG_TIMEOUT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
+import org.apache.kyuubi.engine.DefaultEngineConfProvider
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
import org.apache.kyuubi.service.ServiceUtils
@@ -35,8 +36,12 @@ import org.apache.kyuubi.service.ServiceUtils
class SparkProcessBuilderSuite extends KerberizedTestHelper {
private def conf = KyuubiConf().set("kyuubi.on", "off")
+ private def newSparkProcessBuilder(user: String, conf: KyuubiConf): SparkProcessBuilder = {
+ new SparkProcessBuilder(user, conf, new DefaultEngineConfProvider())
+ }
+
test("spark process builder") {
- val builder = new SparkProcessBuilder("kentyao", conf)
+ val builder = newSparkProcessBuilder("kentyao", conf)
val commands = builder.toString.split(' ')
assert(commands(2) === "org.apache.kyuubi.engine.spark.SparkSQLEngine")
assert(commands.contains("spark.kyuubi.on=off"))
@@ -52,7 +57,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
}
test("capture error from spark process builder") {
- val processBuilder = new SparkProcessBuilder("kentyao", conf.set("spark.ui.port", "abc"))
+ val processBuilder = newSparkProcessBuilder("kentyao", conf.set("spark.ui.port", "abc"))
processBuilder.start
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val error = processBuilder.getError
@@ -62,7 +67,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
}
val processBuilder1 =
- new SparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
+ newSparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
processBuilder1.start
eventually(timeout(90.seconds), interval(500.milliseconds)) {
@@ -75,7 +80,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
test("engine log truncation") {
val msg = "org.apache.spark.sql.hive."
val pb =
- new SparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
+ newSparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy"))
pb.start
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val error1 = pb.getError
@@ -84,7 +89,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
assert(error1.getMessage.contains(msg))
}
- val pb2 = new SparkProcessBuilder(
+ val pb2 = newSparkProcessBuilder(
"kentyao",
conf.set("spark.hive.metastore.uris", "thrift://dummy")
.set(KyuubiConf.ENGINE_ERROR_MAX_SIZE, 200))
@@ -98,29 +103,29 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
}
test("proxy user or keytab") {
- val b1 = new SparkProcessBuilder("kentyao", conf)
+ val b1 = newSparkProcessBuilder("kentyao", conf)
assert(b1.toString.contains("--proxy-user kentyao"))
val conf1 = conf.set("spark.kerberos.principal", testPrincipal)
- val b2 = new SparkProcessBuilder("kentyao", conf1)
+ val b2 = newSparkProcessBuilder("kentyao", conf1)
assert(b2.toString.contains("--proxy-user kentyao"))
val conf2 = conf.set("spark.kerberos.keytab", testKeytab)
- val b3 = new SparkProcessBuilder("kentyao", conf2)
+ val b3 = newSparkProcessBuilder("kentyao", conf2)
assert(b3.toString.contains("--proxy-user kentyao"))
tryWithSecurityEnabled {
val conf3 = conf.set("spark.kerberos.principal", testPrincipal)
.set("spark.kerberos.keytab", "testKeytab")
- val b4 = new SparkProcessBuilder(Utils.currentUser, conf3)
+ val b4 = newSparkProcessBuilder(Utils.currentUser, conf3)
assert(b4.toString.contains(s"--proxy-user ${Utils.currentUser}"))
val conf4 = conf.set("spark.kerberos.principal", testPrincipal)
.set("spark.kerberos.keytab", testKeytab)
- val b5 = new SparkProcessBuilder("kentyao", conf4)
+ val b5 = newSparkProcessBuilder("kentyao", conf4)
assert(b5.toString.contains("--proxy-user kentyao"))
- val b6 = new SparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), conf4)
+ val b6 = newSparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), conf4)
assert(!b6.toString.contains("--proxy-user kentyao"))
}
}
@@ -218,13 +223,13 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
val conf = KyuubiConf()
conf.set(ENGINE_SPARK_MAIN_RESOURCE, hdfsPath)
- val b1 = new SparkProcessBuilder("test", conf)
+ val b1 = newSparkProcessBuilder("test", conf)
assert(b1.mainResource.get.startsWith("hdfs://"))
assert(b1.mainResource.get == hdfsPath)
// user specified jar not exist, get default jar and expect not equals
conf.set(ENGINE_SPARK_MAIN_RESOURCE, jarPath.toString)
- val b2 = new SparkProcessBuilder("test", conf)
+ val b2 = newSparkProcessBuilder("test", conf)
assert(b2.mainResource.getOrElse("") != jarPath.toString)
}
@@ -254,7 +259,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
conf.set("spark.vino", "yang")
conf.set("kent", "yao")
conf.set("hadoop.kent", "yao")
- val builder = new SparkProcessBuilder("", conf)
+ val builder = newSparkProcessBuilder("", conf)
val commands = builder.toString.split(' ')
assert(commands.contains("spark.kyuubi.kent=yao"))
assert(commands.contains("spark.vino=yang"))
@@ -268,13 +273,13 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
conf.set(HighAvailabilityConf.HA_ZK_AUTH_KEYTAB.key, testKeytab)
conf.set(HighAvailabilityConf.HA_ZK_AUTH_PRINCIPAL.key, testPrincipal)
- val b1 = new SparkProcessBuilder("test", conf)
+ val b1 = newSparkProcessBuilder("test", conf)
assert(b1.toString.contains(s"--conf spark.files=$testKeytab"))
}
}
class FakeSparkProcessBuilder(config: KyuubiConf)
- extends SparkProcessBuilder("fake", config) {
+ extends SparkProcessBuilder("fake", config, new DefaultEngineConfProvider()) {
override protected def commands: Array[String] = Array("ls")
}