You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/15 08:43:00 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b6d5c64cb [KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
b6d5c64cb is described below
commit b6d5c64cb4e73319ece6a6cdc0a750d9197784a3
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Fri Apr 15 16:42:36 2022 +0800
[KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
### _Why are the changes needed?_
To close #2306 and close #2307
In this PR, I implement BatchJobSubmission operation and introduce basic `KyuubiBatchSessionImpl`.
TODO:
- Normalize/validate the batch request
- batch request fields
- merge with server pre-defined batch conf
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2353 from turboFei/KPIP_4_batch_submission_op.
Closes #2353
9bc6050c [Fei Wang] remove unused conf
ef8e962c [Fei Wang] check application in current thread
8738660b [Fei Wang] dedup code
27f22008 [Fei Wang] use static secret id instead of conf
6794ff7a [Fei Wang] Use Seq instead of java.util.List
2f4f9b15 [Fei Wang] Remove BatchType enumaration
7d380800 [Fei Wang] remove dead code
a94a9e6a [Fei Wang] remove jars,files fileds on BatchRequest
6021a1e7 [Fei Wang] add ut for result set
07a939c9 [Fei Wang] refactor long line
a918a496 [Fei Wang] address comments
73229e70 [Fei Wang] set engine max life time
bbe3f1f4 [Fei Wang] unique the application cehcker thread pool
9643e42c [Fei Wang] refactor
11dd71f7 [Fei Wang] add KyuubiBatchYarnClusterSuite
12169910 [Fei Wang] add ut for batch session
47da8c1a [Fei Wang] add open batch session api
6dcf60d9 [Fei Wang] add ut for static batch secret id
a212e62b [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
docs/deployment/settings.md | 7 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 7 +
.../engine/spark/SparkBatchProcessBuilder.scala | 13 +-
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 14 +-
.../kyuubi/operation/BatchJobSubmission.scala | 183 +++++++++++++++++++++
.../kyuubi/operation/KyuubiOperationManager.scala | 11 +-
.../org/apache/kyuubi/server/api/v1/dto.scala | 6 +-
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 74 +++++++++
.../kyuubi/session/KyuubiSessionManager.scala | 61 +++++++
.../spark/SparkBatchProcessBuilderSuite.scala | 5 +-
.../operation/KyuubiBatchYarnClusterSuite.scala | 83 ++++++++++
11 files changed, 450 insertions(+), 14 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 7c5d63d97..6962dc0bb 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -154,6 +154,13 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.backend.server.exec.pool.wait.queue.size</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool of Kyuubi server</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
+### Batch
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+<code>kyuubi.batch.application.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check batch job application information.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
+
+
### Credentials
Key | Default | Meaning | Type | Since
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 4e8fb9b51..81bc3259b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -806,6 +806,13 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
+ val BATCH_APPLICATION_CHECK_INTERVAL: ConfigEntry[Long] =
+ buildConf("kyuubi.batch.application.check.interval")
+ .doc("The interval to check batch job application information.")
+ .version("1.6.0")
+ .timeConf
+ .createWithDefaultString("PT5S")
+
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.server.exec.pool.size")
.doc("Number of threads in the operation execution thread pool of Kyuubi server")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 92486acf7..108f13f64 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -47,7 +47,8 @@ class SparkBatchProcessBuilder(
buffer += mainClass
val batchJobTag = batchRequest.conf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId
- val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag)
+
+ val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag) ++ sparkAppNameConf()
allConf.foreach { case (k, v) =>
buffer += CONF
@@ -59,15 +60,21 @@ class SparkBatchProcessBuilder(
mainResource.foreach { r => buffer += r }
- batchRequest.args.asScala.foreach { arg => buffer += arg }
+ batchRequest.args.foreach { arg => buffer += arg }
buffer.toArray
}
+ private def sparkAppNameConf(): Map[String, String] = {
+ Option(batchRequest.name).filterNot(_.isEmpty).map { appName =>
+ Map(APP_KEY -> appName)
+ }.getOrElse(Map())
+ }
+
override protected def module: String = "kyuubi-spark-batch-submit"
private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
- batchRequest.conf.get("spark.master") match {
+ batchRequest.conf.get(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) match {
case Some("yarn") =>
val yarnClient = getYarnClient
val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 18870a611..12170c75b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark
-import java.io.IOException
+import java.io.{File, IOException}
import java.nio.file.Paths
import scala.collection.mutable.ArrayBuffer
@@ -164,11 +164,20 @@ class SparkProcessBuilder(
}
override protected def shortName: String = "spark"
+
+ protected def getSparkDefaultsConf(): Map[String, String] = {
+ val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
+ .orElse(env.get(SPARK_HOME).map(_ + File.separator + "conf"))
+ .map(_ + File.separator + SPARK_CONF_FILE_NAME)
+ .map(new File(_)).filter(_.exists())
+ Utils.getPropertiesFromFile(sparkDefaultsConfFile)
+ }
}
object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
final val TAG_KEY = "spark.yarn.tags"
+ final val MASTER_KEY = "spark.master"
final private[spark] val CONF = "--conf"
final private[spark] val CLASS = "--class"
@@ -178,4 +187,7 @@ object SparkProcessBuilder {
final private[spark] val KEYTAB = "spark.kerberos.keytab"
// Get the appropriate spark-submit file
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
+ final private val SPARK_HOME = "SPARK_HOME"
+ final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
+ final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
new file mode 100644
index 000000000..8303238c2
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import java.nio.ByteBuffer
+import java.util.{ArrayList => JArrayList, Locale}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.session.KyuubiBatchSessionImpl
+import org.apache.kyuubi.util.ThriftUtils
+
+class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchRequest)
+ extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
+
+ override def statement: String = "BATCH_JOB_SUBMISSION"
+
+ override def shouldRunAsync: Boolean = true
+
+ private lazy val _operationLog = OperationLog.createOperationLog(session, getHandle)
+
+ private var builder: ProcBuilder = _
+
+ @volatile
+ private[kyuubi] var appIdAndUrl: Option[(String, String)] = None
+
+ private var resultFetched: Boolean = _
+
+ private val applicationCheckInterval =
+ session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
+
+ override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
+ override protected def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(_operationLog)
+ setHasResultSet(false)
+ setState(OperationState.PENDING)
+ }
+
+ override protected def afterRun(): Unit = {
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override protected def runInternal(): Unit = {
+ val asyncOperation: Runnable = () => {
+ setState(OperationState.RUNNING)
+ try {
+ submitBatchJob()
+ setState(OperationState.FINISHED)
+ } catch onError()
+ }
+ try {
+ val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(opHandle)
+ } catch onError("submitting batch job submission operation in background, request rejected")
+ }
+
+ private def submitBatchJob(): Unit = {
+ builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
+ case Some("SPARK") =>
+ new SparkBatchProcessBuilder(
+ session.user,
+ session.sessionConf,
+ session.batchId,
+ batchRequest,
+ getOperationLog)
+
+ case _ =>
+ throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
+ }
+
+ try {
+ info(s"Submitting ${batchRequest.batchType} batch job: $builder")
+ val process = builder.start
+ while (appIdAndUrl.isEmpty) {
+ try {
+ builder match {
+ case sparkBatchProcessBuilder: SparkBatchProcessBuilder =>
+ sparkBatchProcessBuilder.getApplicationIdAndUrl() match {
+ case Some(appInfo) => appIdAndUrl = Some(appInfo)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ } catch {
+ case e: Exception => error(s"Failed to check batch application", e)
+ }
+ Thread.sleep(applicationCheckInterval)
+ }
+ process.waitFor()
+ if (process.exitValue() != 0) {
+ throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
+ }
+ } finally {
+ builder.close()
+ }
+ }
+
+ override def getResultSetSchema: TTableSchema = {
+ val schema = new TTableSchema()
+ Seq("ApplicationId", "URL").zipWithIndex.foreach { case (colName, position) =>
+ val tColumnDesc = new TColumnDesc()
+ tColumnDesc.setColumnName(colName)
+ val tTypeDesc = new TTypeDesc()
+ tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(new TPrimitiveTypeEntry(TTypeId.STRING_TYPE)))
+ tColumnDesc.setTypeDesc(tTypeDesc)
+ tColumnDesc.setPosition(position)
+ schema.addToColumns(tColumnDesc)
+ }
+ schema
+ }
+
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ order match {
+ case FETCH_NEXT => fetchNext()
+ case FETCH_PRIOR => resultSet
+ case FETCH_FIRST => resultSet
+ }
+ }
+
+ private lazy val resultSet: TRowSet = {
+ val tRow = new TRowSet(0, new JArrayList[TRow](1))
+ val (appId, url) = appIdAndUrl.toSeq.unzip
+
+ val tAppIdColumn = TColumn.stringVal(new TStringColumn(
+ appId.asJava,
+ ByteBuffer.allocate(0)))
+
+ val tUrlColumn = TColumn.stringVal(new TStringColumn(
+ url.asJava,
+ ByteBuffer.allocate(0)))
+
+ tRow.addToColumns(tAppIdColumn)
+ tRow.addToColumns(tUrlColumn)
+ tRow
+ }
+
+ private def fetchNext(): TRowSet = {
+ if (!resultFetched) {
+ resultFetched = true
+ resultSet
+ } else {
+ ThriftUtils.EMPTY_ROW_SET
+ }
+ }
+
+ override def close(): Unit = {
+ if (!isClosedOrCanceled) {
+ if (builder != null) {
+ builder.close()
+ }
+ }
+ super.close()
+ }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index db2898b3e..e7a7d3e6c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -26,7 +26,8 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
import org.apache.kyuubi.util.ThriftUtils
class KyuubiOperationManager private (name: String) extends OperationManager(name) {
@@ -62,6 +63,14 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}
+ def newBatchJobSubmissionOperation(
+ session: KyuubiBatchSessionImpl,
+ batchRequest: BatchRequest): BatchJobSubmission = {
+ val operation = new BatchJobSubmission(session, batchRequest)
+ addOperation(operation)
+ operation
+ }
+
override def newGetTypeInfoOperation(session: Session): Operation = {
val operation = new GetTypeInfo(session)
addOperation(operation)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index ec62b84fe..395ab3eb7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -111,8 +111,6 @@ case class Field(dataType: String, value: Any)
* @param resource the main resource jar, required.
* @param proxyUser the proxy user, optional.
* @param className the main class name, required.
- * @param jars comma-separated list of jars to include, optional.
- * @param files comma-separated list of files to include, optional.
* @param name a name of your batch job, optional.
* @param conf arbitrary configuration properties, optional.
* @param args comma-separated list of batch job arguments, optional.
@@ -122,8 +120,6 @@ case class BatchRequest(
resource: String,
proxyUser: String,
className: String,
- jars: java.util.List[String],
- files: java.util.List[String],
name: String,
conf: Map[String, String],
- args: java.util.List[String])
+ args: Seq[String])
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
new file mode 100644
index 000000000..57beeb4eb
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.session
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.{EventBus, KyuubiEvent, KyuubiSessionEvent}
+import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
+import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.api.v1.BatchRequest
+
+class KyuubiBatchSessionImpl(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String],
+ sessionManager: KyuubiSessionManager,
+ val sessionConf: KyuubiConf,
+ batchRequest: BatchRequest)
+ extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+ override val handle: SessionHandle = sessionManager.newBatchSessionHandle(protocol)
+ val batchId: String = handle.identifier.toString
+
+ private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
+ .newBatchJobSubmissionOperation(this, batchRequest)
+
+ private val sessionEvent = KyuubiSessionEvent(this)
+ EventBus.post(sessionEvent)
+
+ override def getSessionEvent: Option[KyuubiEvent] = {
+ Option(sessionEvent)
+ }
+
+ override def open(): Unit = {
+ MetricsSystem.tracing { ms =>
+ ms.incCount(CONN_TOTAL)
+ ms.incCount(MetricRegistry.name(CONN_OPEN, user))
+ }
+
+ // we should call super.open before running batch job submission operation
+ super.open()
+
+ runOperation(batchJobSubmissionOp)
+ }
+
+ override def close(): Unit = {
+ if (!OperationState.isTerminal(batchJobSubmissionOp.getStatus.state)) {
+ closeOperation(batchJobSubmissionOp.getHandle)
+ }
+ super.close()
+ sessionEvent.endTime = System.currentTimeMillis()
+ EventBus.post(sessionEvent)
+ MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
+ }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 4fb3dd9d8..982c14ca3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,10 +17,13 @@
package org.apache.kyuubi.session
+import java.util.UUID
+
import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.cli.HandleIdentifier
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.credentials.HadoopCredentialsManager
@@ -28,8 +31,10 @@ import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
import org.apache.kyuubi.plugin.{PluginLoader, SessionConfAdvisor}
+import org.apache.kyuubi.server.api.v1.BatchRequest
class KyuubiSessionManager private (name: String) extends SessionManager(name) {
+ import KyuubiSessionManager._
def this() = this(classOf[KyuubiSessionManager].getSimpleName)
@@ -84,6 +89,53 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
}
}
+ def openBatchSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String],
+ batchRequest: BatchRequest): SessionHandle = {
+ val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+ val batchSession = new KyuubiBatchSessionImpl(
+ protocol,
+ user,
+ password,
+ ipAddress,
+ conf,
+ this,
+ // TODO: user defaults conf for batch session
+ this.getConf.getUserDefaults(user),
+ batchRequest)
+ try {
+ val handle = batchSession.handle
+ batchSession.open()
+ setSession(handle, batchSession)
+ info(s"$user's batch session with $handle is opened, current opening sessions" +
+ s" $getOpenSessionCount")
+ handle
+ } catch {
+ case e: Exception =>
+ try {
+ batchSession.close()
+ } catch {
+ case t: Throwable =>
+ warn(s"Error closing batch session for $user client ip: $ipAddress", t)
+ }
+ MetricsSystem.tracing { ms =>
+ ms.incCount(CONN_FAIL)
+ ms.incCount(MetricRegistry.name(CONN_FAIL, user))
+ }
+ throw KyuubiSQLException(
+ s"Error opening batch session for $username client ip $ipAddress, due to ${e.getMessage}",
+ e)
+ }
+ }
+
+ def newBatchSessionHandle(protocol: TProtocolVersion): SessionHandle = {
+ SessionHandle(HandleIdentifier(UUID.randomUUID(), STATIC_BATCH_SECRET_UUID), protocol)
+ }
+
override def start(): Unit = synchronized {
MetricsSystem.tracing { ms =>
ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
@@ -95,3 +147,12 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
override protected def isServer: Boolean = true
}
+
+object KyuubiSessionManager {
+
+ /**
+ * The static session secret UUID used for batch session handle.
+ * To keep compatibility, please do not change it.
+ */
+ val STATIC_BATCH_SECRET_UUID: UUID = UUID.fromString("c2ee5b97-3ea0-41fc-ac16-9bd708ed8f38")
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
index de0f617e0..2dd8a0e5e 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark
import java.util.UUID
-import scala.collection.JavaConverters._
import scala.concurrent.duration._
import org.apache.kyuubi.KyuubiFunSuite
@@ -57,11 +56,9 @@ class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
sparkProcessBuilder.mainResource.get,
"kyuubi",
sparkProcessBuilder.mainClass,
- List.empty[String].asJava,
- List.empty[String].asJava,
"spark-batch-submission",
Map("spark.master" -> "yarn"),
- List.empty[String].asJava)
+ Seq.empty[String])
val builder = new SparkBatchProcessBuilder(
batchRequest.proxyUser,
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
new file mode 100644
index 000000000..7f8835277
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.WithKyuubiServerOnYarn
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+
+class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
+ override protected val connectionConf: Map[String, String] = Map.empty
+
+ override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
+
+ private def sessionManager(): KyuubiSessionManager =
+ server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+
+ test("open batch session") {
+ val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+
+ val batchRequest = BatchRequest(
+ "spark",
+ sparkProcessBuilder.mainResource.get,
+ "kyuubi",
+ sparkProcessBuilder.mainClass,
+ "spark-batch-submission",
+ Map(
+ "spark.master" -> "yarn",
+ s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+ s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+ Seq.empty[String])
+
+ val sessionHandle = sessionManager.openBatchSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+ batchRequest.proxyUser,
+ "passwd",
+ "localhost",
+ batchRequest.conf,
+ batchRequest)
+
+ assert(sessionHandle.identifier.secretId === KyuubiSessionManager.STATIC_BATCH_SECRET_UUID)
+ val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ val batchJobSubmissionOp = session.batchJobSubmissionOp
+
+ eventually(timeout(3.minutes), interval(500.milliseconds)) {
+ val applicationIdAndUrl = batchJobSubmissionOp.appIdAndUrl
+ assert(applicationIdAndUrl.isDefined)
+ assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
+ assert(applicationIdAndUrl.exists(_._2.nonEmpty))
+
+ assert(batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
+ val resultColumns = batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 1)
+ .getColumns.asScala
+ val appId = resultColumns.apply(0).getStringVal.getValues.asScala.apply(0)
+ val url = resultColumns.apply(1).getStringVal.getValues.asScala.apply(0)
+ assert(appId === batchJobSubmissionOp.appIdAndUrl.get._1)
+ assert(url === batchJobSubmissionOp.appIdAndUrl.get._2)
+ }
+ sessionManager.closeSession(sessionHandle)
+ }
+}