You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/29 10:14:57 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2309][SUB-TASK][KPIP-4] Implement BatchesResource POST /batches

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a36db65a [KYUUBI #2309][SUB-TASK][KPIP-4] Implement BatchesResource POST /batches
5a36db65a is described below

commit 5a36db65a3038783d097ef1a48d124bcd3c5c99a
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Fri Apr 29 18:14:50 2022 +0800

    [KYUUBI #2309][SUB-TASK][KPIP-4] Implement BatchesResource POST /batches
    
    ### _Why are the changes needed?_
    
    To close #2309
    
    The response
    
    | Name    | Description                        | Type                                                         |
    | :------ | :--------------------------------- | :----------------------------------------------------------- |
    | id      | The session id                   | string                                                       |
    | batchType| The batch type |       string|
    | batchInfo | The detailed application info      | Map of key=val, such as id, url, error. |
    |kyuubiInstance| The kyuubi instance connection url | string|
    | state   | The batch state                    | string                                                       |
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2491 from turboFei/kpip_4_post_batches_2309.
    
    Closes #2309
    
    d0dbf7cf [Fei Wang] comment for proxy user ut
    1906bfee [Fei Wang] comment
    e79dec17 [Fei Wang] merge proxy user into conf
    f8469710 [Fei Wang] comment 2
    692ec768 [Fei Wang] use anonymous as password
    416d9ded [Fei Wang] fix empty user
    5b560fc8 [Fei Wang] return kyuubi instance outof batch info
    0557f93d [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchesResource POST /batches #2491
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../kyuubi/operation/BatchJobSubmission.scala      | 34 ++++++-----
 .../kyuubi/server/KyuubiRestFrontendService.scala  | 26 ++++----
 .../kyuubi/server/api/v1/ApiRootResource.scala     |  3 +
 .../kyuubi/server/api/v1/BatchesResource.scala     | 70 ++++++++++++++++++++++
 .../kyuubi/server/api/v1/SessionsResource.scala    |  2 +-
 .../org/apache/kyuubi/server/api/v1/dto.scala      |  9 ++-
 .../kyuubi/session/KyuubiSessionManager.scala      |  1 -
 .../operation/KyuubiBatchYarnClusterSuite.scala    |  3 +-
 .../server/api/v1/BatchesResourceSuite.scala       | 59 ++++++++++++++++++
 9 files changed, 172 insertions(+), 35 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 9dc2c7a80..30023710d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -46,9 +46,25 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchReq
   private val applicationManager =
     session.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
 
-  private var builder: ProcBuilder = _
+  private[kyuubi] val batchId: String = session.handle.identifier.toString
 
-  private val batchId: String = session.handle.identifier.toString
+  private[kyuubi] val batchType: String = batchRequest.batchType
+
+  private val builder: ProcBuilder = {
+    Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
+      case Some("SPARK") =>
+        val batchSparkConf = session.sessionConf.getBatchConf("spark")
+        new SparkBatchProcessBuilder(
+          session.user,
+          session.sessionConf,
+          batchId,
+          batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
+          getOperationLog)
+
+      case _ =>
+        throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
+    }
+  }
 
   private[kyuubi] def currentApplicationState: Option[Map[String, String]] = {
     applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
@@ -84,20 +100,6 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchReq
   }
 
   private def submitBatchJob(): Unit = {
-    builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
-      case Some("SPARK") =>
-        val batchSparkConf = session.sessionConf.getBatchConf("spark")
-        new SparkBatchProcessBuilder(
-          session.user,
-          session.sessionConf,
-          batchId,
-          batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
-          getOperationLog)
-
-      case _ =>
-        throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
-    }
-
     try {
       info(s"Submitting ${batchRequest.batchType} batch job: $builder")
       val process = builder.start
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 8fc79ad2c..0417f3556 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -27,7 +27,7 @@ import org.eclipse.jetty.servlet.FilterHolder
 import org.apache.kyuubi.{KyuubiException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT}
-import org.apache.kyuubi.server.api.v1.{ApiRootResource, SessionOpenRequest}
+import org.apache.kyuubi.server.api.v1.ApiRootResource
 import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
 import org.apache.kyuubi.server.ui.JettyServer
 import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
@@ -93,24 +93,24 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
     super.stop()
   }
 
-  def getUserName(req: SessionOpenRequest): String = {
-    val realUser: String =
-      ServiceUtils.getShortName(Option(AuthenticationFilter.getUserName).getOrElse(req.user))
-    if (req.configs == null) {
-      realUser
-    } else {
-      getProxyUser(req.configs, Option(AuthenticationFilter.getUserIpAddress).orNull, realUser)
-    }
+  def getUserName(sessionConf: Map[String, String]): String = {
+    val realUser: String = ServiceUtils.getShortName(
+      Option(AuthenticationFilter.getUserName).filter(_.nonEmpty).getOrElse("anonymous"))
+    getProxyUser(sessionConf, Option(AuthenticationFilter.getUserIpAddress).orNull, realUser)
   }
 
   private def getProxyUser(
       sessionConf: Map[String, String],
       ipAddress: String,
       realUser: String): String = {
-    sessionConf.get(KyuubiAuthenticationFactory.HS2_PROXY_USER).map { proxyUser =>
-      KyuubiAuthenticationFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hadoopConf)
-      proxyUser
-    }.getOrElse(realUser)
+    if (sessionConf == null) {
+      realUser
+    } else {
+      sessionConf.get(KyuubiAuthenticationFactory.HS2_PROXY_USER).map { proxyUser =>
+        KyuubiAuthenticationFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hadoopConf)
+        proxyUser
+      }.getOrElse(realUser)
+    }
   }
 
   override val discoveryService: Option[Service] = None
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala
index 5bad5105b..c555f647e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala
@@ -55,6 +55,9 @@ private[v1] class ApiRootResource extends ApiRequestContext {
   @Path("operations")
   def operations: Class[OperationsResource] = classOf[OperationsResource]
 
+  @Path("batches")
+  def batches: Class[BatchesResource] = classOf[BatchesResource]
+
   @GET
   @Path("exception")
   @Produces(Array(MediaType.TEXT_PLAIN))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
new file mode 100644
index 000000000..bf30bc88a
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.api.v1
+
+import javax.ws.rs.{Consumes, POST, Produces}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+import io.swagger.v3.oas.annotations.tags.Tag
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.server.api.ApiRequestContext
+import org.apache.kyuubi.server.api.v1.BatchesResource.REST_BATCH_PROTOCOL
+import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+
+@Tag(name = "Batch")
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class BatchesResource extends ApiRequestContext with Logging {
+
+  private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON)),
+    description = "create and open a batch session")
+  @POST
+  @Consumes(Array(MediaType.APPLICATION_JSON))
+  def openBatchSession(request: BatchRequest): Batch = {
+    val userName = fe.getUserName(request.conf)
+    val ipAddress = AuthenticationFilter.getUserIpAddress
+    val sessionHandle = sessionManager.openBatchSession(
+      REST_BATCH_PROTOCOL,
+      userName,
+      "anonymous",
+      ipAddress,
+      Option(request.conf).getOrElse(Map()),
+      request)
+    val batchOp = sessionManager.getSession(sessionHandle).asInstanceOf[
+      KyuubiBatchSessionImpl].batchJobSubmissionOp
+    Batch(
+      batchOp.batchId,
+      batchOp.batchType,
+      batchOp.currentApplicationState.getOrElse(Map.empty),
+      fe.connectionUrl,
+      batchOp.getStatus.state.toString)
+  }
+}
+
+object BatchesResource {
+  val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index a8acffc1b..d9f37dc86 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -132,7 +132,7 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
   @POST
   @Consumes(Array(MediaType.APPLICATION_JSON))
   def openSession(request: SessionOpenRequest): SessionHandle = {
-    val userName = fe.getUserName(request)
+    val userName = fe.getUserName(request.configs)
     val ipAddress = AuthenticationFilter.getUserIpAddress
     fe.be.openSession(
       TProtocolVersion.findByValue(request.protocolVersion),
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 395ab3eb7..1f964c957 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -109,7 +109,6 @@ case class Field(dataType: String, value: Any)
  *
  * @param batchType the batch job type, such as spark, flink, etc.
  * @param resource the main resource jar, required.
- * @param proxyUser the proxy user, optional.
  * @param className the main class name, required.
  * @param name a name of your batch job, optional.
  * @param conf arbitrary configuration properties, optional.
@@ -118,8 +117,14 @@ case class Field(dataType: String, value: Any)
 case class BatchRequest(
     batchType: String,
     resource: String,
-    proxyUser: String,
     className: String,
     name: String,
     conf: Map[String, String],
     args: Seq[String])
+
+case class Batch(
+    id: String,
+    batchType: String,
+    batchInfo: Map[String, String],
+    kyuubiInstance: String,
+    state: String)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 851492cf1..4d1b4d185 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -120,7 +120,6 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
       ipAddress,
       conf,
       this,
-      // TODO: user defaults conf for batch session
       this.getConf.getUserDefaults(user),
       batchRequest)
     try {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
index dd9507882..df47de86b 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -46,7 +46,6 @@ class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
     val batchRequest = BatchRequest(
       "spark",
       sparkProcessBuilder.mainResource.get,
-      "kyuubi",
       sparkProcessBuilder.mainClass,
       "spark-batch-submission",
       Map(
@@ -57,7 +56,7 @@ class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
 
     val sessionHandle = sessionManager().openBatchSession(
       TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
-      batchRequest.proxyUser,
+      "kyuubi",
       "passwd",
       "localhost",
       batchRequest.conf,
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
new file mode 100644
index 000000000..cabf3e1bb
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.api.v1
+
+import javax.ws.rs.client.Entity
+import javax.ws.rs.core.MediaType
+
+import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SPARK_MAX_LIFETIME}
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
+
+class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
+  test("open batch session") {
+    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+    val requestObj = BatchRequest(
+      "spark",
+      sparkProcessBuilder.mainResource.get,
+      sparkProcessBuilder.mainClass,
+      "spark-batch-submission",
+      Map(
+        "spark.master" -> "local",
+        s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+        s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+      Seq.empty[String])
+
+    val response = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+    assert(200 == response.getStatus)
+
+    val batch = response.readEntity(classOf[Batch])
+    assert(batch.kyuubiInstance === fe.connectionUrl)
+
+    val requestObj2 = requestObj.copy(conf = requestObj.conf ++
+      Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root"))
+    val response2 = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
+
+    assert(500 == response2.getStatus)
+  }
+}