You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/12 13:59:46 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2324] [SUB-TASK][KPIP-4] Implement SparkBatchProcessBuilder to submit spark batch job
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a63e811ea [KYUUBI #2324] [SUB-TASK][KPIP-4] Implement SparkBatchProcessBuilder to submit spark batch job
a63e811ea is described below
commit a63e811ea49a5a70b8e8b895c29aea740976c6ce
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Tue Apr 12 21:59:37 2022 +0800
[KYUUBI #2324] [SUB-TASK][KPIP-4] Implement SparkBatchProcessBuilder to submit spark batch job
### _Why are the changes needed?_
To close #2305
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2324 from turboFei/KPIP_4_sparkprocessbuilder.
Closes #2324
0f2be871 [Fei Wang] add comments
62be333e [Fei Wang] tag
248963e3 [Fei Wang] refactor
61efcbf6 [Fei Wang] use yarn config
aa437b64 [Fei Wang] use class loader
f6e05c20 [Fei Wang] refactor tag
e9043e32 [Fei Wang] use yarn tags
ee8117de [Fei Wang] add ut
01e8a469 [Fei Wang] add SparkBatchProcessBuilder
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../engine/spark/SparkBatchProcessBuilder.scala | 88 ++++++++++++++++++++++
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 17 +++--
.../org/apache/kyuubi/server/api/v1/dto.scala | 24 ++++++
.../spark/SparkBatchProcessBuilderSuite.scala | 81 ++++++++++++++++++++
.../org/apache/kyuubi/server/MiniYarnService.scala | 8 +-
5 files changed, 209 insertions(+), 9 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
new file mode 100644
index 000000000..92486acf7
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+class SparkBatchProcessBuilder(
+ override val proxyUser: String,
+ override val conf: KyuubiConf,
+ batchId: String,
+ batchRequest: BatchRequest,
+ override val extraEngineLog: Option[OperationLog] = None)
+ extends SparkProcessBuilder(proxyUser, conf, extraEngineLog) {
+ import SparkProcessBuilder._
+
+ override def mainClass: String = batchRequest.className
+
+ override def mainResource: Option[String] = Option(batchRequest.resource)
+
+ override protected def commands: Array[String] = {
+ val buffer = new ArrayBuffer[String]()
+ buffer += executable
+ buffer += CLASS
+ buffer += mainClass
+
+ val batchJobTag = batchRequest.conf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId
+ val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag)
+
+ allConf.foreach { case (k, v) =>
+ buffer += CONF
+ buffer += s"$k=$v"
+ }
+
+ buffer += PROXY_USER
+ buffer += proxyUser
+
+ mainResource.foreach { r => buffer += r }
+
+ batchRequest.args.asScala.foreach { arg => buffer += arg }
+
+ buffer.toArray
+ }
+
+ override protected def module: String = "kyuubi-spark-batch-submit"
+
+ private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
+ batchRequest.conf.get("spark.master") match {
+ case Some("yarn") =>
+ val yarnClient = getYarnClient
+ val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
+ yarnClient.init(yarnConf)
+ yarnClient.start()
+ try {
+ val apps = yarnClient.getApplications(null, null, Set(batchId).asJava)
+ apps.asScala.headOption.map { applicationReport =>
+ applicationReport.getApplicationId.toString -> applicationReport.getTrackingUrl
+ }
+ } finally {
+ yarnClient.stop()
+ }
+
+ case _ => None // TODO: Support other resource manager
+ }
+ }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 656c47a7a..7b5b70d72 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -27,6 +27,7 @@ import scala.util.matching.Regex
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
@@ -173,8 +174,8 @@ class SparkProcessBuilder(
YARN_APP_NAME_REGEX.findFirstIn(line) match {
case Some(appId) =>
try {
- val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
- yarnClient.init(hadoopConf)
+ val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
+ yarnClient.init(yarnConf)
yarnClient.start()
val applicationId = ApplicationId.fromString(appId)
yarnClient.killApplication(applicationId)
@@ -197,12 +198,12 @@ object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
final val TAG_KEY = "spark.yarn.tags"
- final private val CONF = "--conf"
- final private val CLASS = "--class"
- final private val PROXY_USER = "--proxy-user"
- final private val SPARK_FILES = "spark.files"
- final private val PRINCIPAL = "spark.kerberos.principal"
- final private val KEYTAB = "spark.kerberos.keytab"
+ final private[spark] val CONF = "--conf"
+ final private[spark] val CLASS = "--class"
+ final private[spark] val PROXY_USER = "--proxy-user"
+ final private[spark] val SPARK_FILES = "spark.files"
+ final private[spark] val PRINCIPAL = "spark.kerberos.principal"
+ final private[spark] val KEYTAB = "spark.kerberos.keytab"
// Get the appropriate spark-submit file
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 7158469bb..ec62b84fe 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -103,3 +103,27 @@ case class ResultRowSet(rows: Seq[Row], rowCount: Int)
case class Row(fields: Seq[Field])
case class Field(dataType: String, value: Any)
+
+/**
+ * The request body for batch job submission.
+ *
+ * @param batchType the batch job type, such as spark, flink, etc.
+ * @param resource the main resource jar, required.
+ * @param proxyUser the proxy user, optional.
+ * @param className the main class name, required.
+ * @param jars comma-separated list of jars to include, optional.
+ * @param files comma-separated list of files to include, optional.
+ * @param name a name of your batch job, optional.
+ * @param conf arbitrary configuration properties, optional.
+ * @param args comma-separated list of batch job arguments, optional.
+ */
+case class BatchRequest(
+ batchType: String,
+ resource: String,
+ proxyUser: String,
+ className: String,
+ jars: java.util.List[String],
+ files: java.util.List[String],
+ name: String,
+ conf: Map[String, String],
+ args: java.util.List[String])
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
new file mode 100644
index 000000000..de0f617e0
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.MiniYarnService
+import org.apache.kyuubi.server.api.v1.BatchRequest
+
+class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
+ private val conf = KyuubiConf().set("kyuubi.on", "off")
+ private var miniYarnService: MiniYarnService = _
+
+ override def beforeAll(): Unit = {
+ miniYarnService = new MiniYarnService()
+ miniYarnService.initialize(new KyuubiConf(false))
+ miniYarnService.start()
+ conf.set(
+ s"${KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX}.HADOOP_CONF_DIR",
+ miniYarnService.getHadoopConfDir)
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ if (miniYarnService != null) {
+ miniYarnService.stop()
+ miniYarnService = null
+ }
+ super.afterAll()
+ }
+
+ test("spark batch process builder") {
+ val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+
+ val batchRequest = BatchRequest(
+ "spark",
+ sparkProcessBuilder.mainResource.get,
+ "kyuubi",
+ sparkProcessBuilder.mainClass,
+ List.empty[String].asJava,
+ List.empty[String].asJava,
+ "spark-batch-submission",
+ Map("spark.master" -> "yarn"),
+ List.empty[String].asJava)
+
+ val builder = new SparkBatchProcessBuilder(
+ batchRequest.proxyUser,
+ conf,
+ UUID.randomUUID().toString,
+ batchRequest)
+ val proc = builder.start
+
+ eventually(timeout(3.minutes), interval(500.milliseconds)) {
+ val applicationIdAndUrl = builder.getApplicationIdAndUrl()
+ assert(applicationIdAndUrl.isDefined)
+ assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
+ assert(applicationIdAndUrl.exists(_._2.nonEmpty))
+ }
+ proc.destroyForcibly()
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index cc710e493..356c135a7 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.server
import java.io.{File, FileWriter}
-import java.net.InetAddress
+import java.net.{InetAddress, URLClassLoader}
import scala.collection.JavaConverters._
@@ -40,6 +40,8 @@ class MiniYarnService(name: String) extends AbstractService(name) {
private var yarnConf: YarnConfiguration = _
private var yarnCluster: MiniYARNCluster = _
+ private val classLoader = Thread.currentThread().getContextClassLoader
+
private def newYarnConfig(): YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
// Disable the disk utilization check to avoid the test hanging when people's disks are
@@ -93,12 +95,16 @@ class MiniYarnService(name: String) extends AbstractService(name) {
info(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
saveHadoopConf()
super.start()
+
+ val hadoopConfClassLoader = new URLClassLoader(Array(hadoopConfDir.toURI.toURL), classLoader)
+ Thread.currentThread().setContextClassLoader(hadoopConfClassLoader)
}
override def stop(): Unit = {
if (yarnCluster != null) yarnCluster.stop()
if (hadoopConfDir != null) hadoopConfDir.delete()
super.stop()
+ Thread.currentThread().setContextClassLoader(classLoader)
}
private def saveHadoopConf(): Unit = {