You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/12 13:59:46 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2324] [SUB-TASK][KPIP-4] Implement SparkBatchProcessBuilder to submit spark batch job

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a63e811ea [KYUUBI #2324] [SUB-TASK][KPIP-4] Implement SparkBatchProcessBuilder to submit spark batch job
a63e811ea is described below

commit a63e811ea49a5a70b8e8b895c29aea740976c6ce
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Tue Apr 12 21:59:37 2022 +0800

    [KYUUBI #2324] [SUB-TASK][KPIP-4] Implement SparkBatchProcessBuilder to submit spark batch job
    
    ### _Why are the changes needed?_
    
    To close #2305
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2324 from turboFei/KPIP_4_sparkprocessbuilder.
    
    Closes #2324
    
    0f2be871 [Fei Wang] add comments
    62be333e [Fei Wang] tag
    248963e3 [Fei Wang] refactor
    61efcbf6 [Fei Wang] use yarn config
    aa437b64 [Fei Wang] use class loader
    f6e05c20 [Fei Wang] refactor tag
    e9043e32 [Fei Wang] use yarn tags
    ee8117de [Fei Wang] add ut
    01e8a469 [Fei Wang] add SparkBatchProcessBuilder
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../engine/spark/SparkBatchProcessBuilder.scala    | 88 ++++++++++++++++++++++
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 17 +++--
 .../org/apache/kyuubi/server/api/v1/dto.scala      | 24 ++++++
 .../spark/SparkBatchProcessBuilderSuite.scala      | 81 ++++++++++++++++++++
 .../org/apache/kyuubi/server/MiniYarnService.scala |  8 +-
 5 files changed, 209 insertions(+), 9 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
new file mode 100644
index 000000000..92486acf7
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+class SparkBatchProcessBuilder(
+    override val proxyUser: String,
+    override val conf: KyuubiConf,
+    batchId: String,
+    batchRequest: BatchRequest,
+    override val extraEngineLog: Option[OperationLog] = None)
+  extends SparkProcessBuilder(proxyUser, conf, extraEngineLog) {
+  import SparkProcessBuilder._
+
+  override def mainClass: String = batchRequest.className
+
+  override def mainResource: Option[String] = Option(batchRequest.resource)
+
+  override protected def commands: Array[String] = {
+    val buffer = new ArrayBuffer[String]()
+    buffer += executable
+    buffer += CLASS
+    buffer += mainClass
+
+    val batchJobTag = batchRequest.conf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId
+    val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag)
+
+    allConf.foreach { case (k, v) =>
+      buffer += CONF
+      buffer += s"$k=$v"
+    }
+
+    buffer += PROXY_USER
+    buffer += proxyUser
+
+    mainResource.foreach { r => buffer += r }
+
+    batchRequest.args.asScala.foreach { arg => buffer += arg }
+
+    buffer.toArray
+  }
+
+  override protected def module: String = "kyuubi-spark-batch-submit"
+
+  private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
+    batchRequest.conf.get("spark.master") match {
+      case Some("yarn") =>
+        val yarnClient = getYarnClient
+        val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
+        yarnClient.init(yarnConf)
+        yarnClient.start()
+        try {
+          val apps = yarnClient.getApplications(null, null, Set(batchId).asJava)
+          apps.asScala.headOption.map { applicationReport =>
+            applicationReport.getApplicationId.toString -> applicationReport.getTrackingUrl
+          }
+        } finally {
+          yarnClient.stop()
+        }
+
+      case _ => None // TODO: Support other resource manager
+    }
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 656c47a7a..7b5b70d72 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -27,6 +27,7 @@ import scala.util.matching.Regex
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
@@ -173,8 +174,8 @@ class SparkProcessBuilder(
     YARN_APP_NAME_REGEX.findFirstIn(line) match {
       case Some(appId) =>
         try {
-          val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
-          yarnClient.init(hadoopConf)
+          val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
+          yarnClient.init(yarnConf)
           yarnClient.start()
           val applicationId = ApplicationId.fromString(appId)
           yarnClient.killApplication(applicationId)
@@ -197,12 +198,12 @@ object SparkProcessBuilder {
   final val APP_KEY = "spark.app.name"
   final val TAG_KEY = "spark.yarn.tags"
 
-  final private val CONF = "--conf"
-  final private val CLASS = "--class"
-  final private val PROXY_USER = "--proxy-user"
-  final private val SPARK_FILES = "spark.files"
-  final private val PRINCIPAL = "spark.kerberos.principal"
-  final private val KEYTAB = "spark.kerberos.keytab"
+  final private[spark] val CONF = "--conf"
+  final private[spark] val CLASS = "--class"
+  final private[spark] val PROXY_USER = "--proxy-user"
+  final private[spark] val SPARK_FILES = "spark.files"
+  final private[spark] val PRINCIPAL = "spark.kerberos.principal"
+  final private[spark] val KEYTAB = "spark.kerberos.keytab"
   // Get the appropriate spark-submit file
   final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 7158469bb..ec62b84fe 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -103,3 +103,27 @@ case class ResultRowSet(rows: Seq[Row], rowCount: Int)
 case class Row(fields: Seq[Field])
 
 case class Field(dataType: String, value: Any)
+
+/**
+ * The request body for batch job submission.
+ *
+ * @param batchType the batch job type, such as spark, flink, etc.
+ * @param resource the main resource jar, required.
+ * @param proxyUser the proxy user, optional.
+ * @param className the main class name, required.
+ * @param jars comma-separated list of jars to include, optional.
+ * @param files comma-separated list of files to include, optional.
+ * @param name a name of your batch job, optional.
+ * @param conf arbitrary configuration properties, optional.
+ * @param args comma-separated list of batch job arguments, optional.
+ */
+case class BatchRequest(
+    batchType: String,
+    resource: String,
+    proxyUser: String,
+    className: String,
+    jars: java.util.List[String],
+    files: java.util.List[String],
+    name: String,
+    conf: Map[String, String],
+    args: java.util.List[String])
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
new file mode 100644
index 000000000..de0f617e0
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.MiniYarnService
+import org.apache.kyuubi.server.api.v1.BatchRequest
+
+class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
+  private val conf = KyuubiConf().set("kyuubi.on", "off")
+  private var miniYarnService: MiniYarnService = _
+
+  override def beforeAll(): Unit = {
+    miniYarnService = new MiniYarnService()
+    miniYarnService.initialize(new KyuubiConf(false))
+    miniYarnService.start()
+    conf.set(
+      s"${KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX}.HADOOP_CONF_DIR",
+      miniYarnService.getHadoopConfDir)
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    if (miniYarnService != null) {
+      miniYarnService.stop()
+      miniYarnService = null
+    }
+    super.afterAll()
+  }
+
+  test("spark batch process builder") {
+    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+
+    val batchRequest = BatchRequest(
+      "spark",
+      sparkProcessBuilder.mainResource.get,
+      "kyuubi",
+      sparkProcessBuilder.mainClass,
+      List.empty[String].asJava,
+      List.empty[String].asJava,
+      "spark-batch-submission",
+      Map("spark.master" -> "yarn"),
+      List.empty[String].asJava)
+
+    val builder = new SparkBatchProcessBuilder(
+      batchRequest.proxyUser,
+      conf,
+      UUID.randomUUID().toString,
+      batchRequest)
+    val proc = builder.start
+
+    eventually(timeout(3.minutes), interval(500.milliseconds)) {
+      val applicationIdAndUrl = builder.getApplicationIdAndUrl()
+      assert(applicationIdAndUrl.isDefined)
+      assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
+      assert(applicationIdAndUrl.exists(_._2.nonEmpty))
+    }
+    proc.destroyForcibly()
+  }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index cc710e493..356c135a7 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.server
 
 import java.io.{File, FileWriter}
-import java.net.InetAddress
+import java.net.{InetAddress, URLClassLoader}
 
 import scala.collection.JavaConverters._
 
@@ -40,6 +40,8 @@ class MiniYarnService(name: String) extends AbstractService(name) {
   private var yarnConf: YarnConfiguration = _
   private var yarnCluster: MiniYARNCluster = _
 
+  private val classLoader = Thread.currentThread().getContextClassLoader
+
   private def newYarnConfig(): YarnConfiguration = {
     val yarnConfig = new YarnConfiguration()
     // Disable the disk utilization check to avoid the test hanging when people's disks are
@@ -93,12 +95,16 @@ class MiniYarnService(name: String) extends AbstractService(name) {
     info(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
     saveHadoopConf()
     super.start()
+
+    val hadoopConfClassLoader = new URLClassLoader(Array(hadoopConfDir.toURI.toURL), classLoader)
+    Thread.currentThread().setContextClassLoader(hadoopConfClassLoader)
   }
 
   override def stop(): Unit = {
     if (yarnCluster != null) yarnCluster.stop()
     if (hadoopConfDir != null) hadoopConfDir.delete()
     super.stop()
+    Thread.currentThread().setContextClassLoader(classLoader)
   }
 
   private def saveHadoopConf(): Unit = {