You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/11/24 18:40:06 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3836] [SPARK] Support pyspark batch job by restful api

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d2d2eb6 [KYUUBI #3836] [SPARK] Support pyspark batch job by restful api
00d2d2eb6 is described below

commit 00d2d2eb67e1fa30aeda68f8fbfadb4e3b5719fe
Author: Xuedong Luan <lu...@gmail.com>
AuthorDate: Fri Nov 25 02:39:54 2022 +0800

    [KYUUBI #3836] [SPARK] Support pyspark batch job by restful api
    
    ### _Why are the changes needed?_
    
    Submit pyspark batch job by restful api
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3836 from leoluan2009/pyspark-1.
    
    Closes #3836
    
    550021ac [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
    357691c2 [Xuedong Luan] fix comment
    7dfdbe24 [Xuedong Luan] fix comment
    31bda178 [Xuedong Luan] [WIP] Support pyspark batch job by restful api
    
    Lead-authored-by: Xuedong Luan <lu...@gmail.com>
    Co-authored-by: Cheng Pan <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala        | 6 +++++-
 .../scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala   | 2 +-
 .../scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala  | 1 +
 .../main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala | 2 +-
 .../scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala     | 6 ++++--
 5 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 33a209de1..ddf831834 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -124,7 +124,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
 
   /** Get all batch conf as map */
   def getBatchConf(batchType: String): Map[String, String] = {
-    getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "")
+    val normalizedBatchType = batchType.toLowerCase(Locale.ROOT) match {
+      case "pyspark" => "spark"
+      case other => other.toLowerCase(Locale.ROOT)
+    }
+    getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.$normalizedBatchType", "")
   }
 
   /**
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index b772afc04..481d7a2f1 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -178,7 +178,7 @@ object KyuubiApplicationManager {
       appConf: Map[String, String],
       kyuubiConf: KyuubiConf): Unit = {
     applicationType.toUpperCase(Locale.ROOT) match {
-      case appType if appType.startsWith("SPARK") => checkSparkAccessPaths(appConf, kyuubiConf)
+      case appType if appType.contains("SPARK") => checkSparkAccessPaths(appConf, kyuubiConf)
       case appType if appType.startsWith("FLINK") => // TODO: check flink app access local paths
       case _ =>
     }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 410f141e0..41fd9b2ad 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -206,6 +206,7 @@ object SparkProcessBuilder {
     "spark.yarn.jars",
     "spark.yarn.dist.files",
     "spark.yarn.dist.pyFiles",
+    "spark.submit.pyFiles",
     "spark.yarn.dist.jars",
     "spark.yarn.dist.archives",
     "spark.kerberos.keytab",
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 1dfef22b0..d981b80cb 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -77,7 +77,7 @@ class BatchJobSubmission(
   @VisibleForTesting
   private[kyuubi] val builder: ProcBuilder = {
     Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
-      case Some("SPARK") =>
+      case Some("SPARK") | Some("PYSPARK") =>
         new SparkBatchProcessBuilder(
           session.user,
           session.sessionConf,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index f5c8c8fc9..5c8d691f9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -163,7 +163,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
       supportedBatchType(request.getBatchType),
       s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
     require(request.getResource != null, "resource is a required parameter")
-    require(request.getClassName != null, "classname is a required parameter")
+    if (request.getBatchType.equalsIgnoreCase("SPARK")) {
+      require(request.getClassName != null, "classname is a required parameter for SPARK")
+    }
     request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
 
     val userName = fe.getSessionUser(request.getConf.asScala.toMap)
@@ -366,7 +368,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
 }
 
 object BatchesResource {
-  val SUPPORTED_BATCH_TYPES = Seq("SPARK")
+  val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK")
   val VALID_BATCH_STATES = Seq(
     OperationState.PENDING,
     OperationState.RUNNING,