You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/29 10:14:57 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2309][SUB-TASK][KPIP-4] Implement BatchesResource POST /batches
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5a36db65a [KYUUBI #2309][SUB-TASK][KPIP-4] Implement BatchesResource POST /batches
5a36db65a is described below
commit 5a36db65a3038783d097ef1a48d124bcd3c5c99a
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Fri Apr 29 18:14:50 2022 +0800
[KYUUBI #2309][SUB-TASK][KPIP-4] Implement BatchesResource POST /batches
### _Why are the changes needed?_
To close #2309
The response
| Name | Description | Type |
| :------ | :--------------------------------- | :----------------------------------------------------------- |
| id | The session id | string |
| batchType| The batch type | string|
| batchInfo | The detailed application info | Map of key=val, such as id, url, error. |
|kyuubiInstance| The kyuubi instance connection url | string|
| state | The batch state | string |
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2491 from turboFei/kpip_4_post_batches_2309.
Closes #2309
d0dbf7cf [Fei Wang] comment for proxy user ut
1906bfee [Fei Wang] comment
e79dec17 [Fei Wang] merge proxy user into conf
f8469710 [Fei Wang] comment 2
692ec768 [Fei Wang] use anonymous as password
416d9ded [Fei Wang] fix empty user
5b560fc8 [Fei Wang] return kyuubi instance outof batch info
0557f93d [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchesResource POST /batches #2491
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../kyuubi/operation/BatchJobSubmission.scala | 34 ++++++-----
.../kyuubi/server/KyuubiRestFrontendService.scala | 26 ++++----
.../kyuubi/server/api/v1/ApiRootResource.scala | 3 +
.../kyuubi/server/api/v1/BatchesResource.scala | 70 ++++++++++++++++++++++
.../kyuubi/server/api/v1/SessionsResource.scala | 2 +-
.../org/apache/kyuubi/server/api/v1/dto.scala | 9 ++-
.../kyuubi/session/KyuubiSessionManager.scala | 1 -
.../operation/KyuubiBatchYarnClusterSuite.scala | 3 +-
.../server/api/v1/BatchesResourceSuite.scala | 59 ++++++++++++++++++
9 files changed, 172 insertions(+), 35 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 9dc2c7a80..30023710d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -46,9 +46,25 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchReq
private val applicationManager =
session.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
- private var builder: ProcBuilder = _
+ private[kyuubi] val batchId: String = session.handle.identifier.toString
- private val batchId: String = session.handle.identifier.toString
+ private[kyuubi] val batchType: String = batchRequest.batchType
+
+ private val builder: ProcBuilder = {
+ Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
+ case Some("SPARK") =>
+ val batchSparkConf = session.sessionConf.getBatchConf("spark")
+ new SparkBatchProcessBuilder(
+ session.user,
+ session.sessionConf,
+ batchId,
+ batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
+ getOperationLog)
+
+ case _ =>
+ throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
+ }
+ }
private[kyuubi] def currentApplicationState: Option[Map[String, String]] = {
applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
@@ -84,20 +100,6 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchReq
}
private def submitBatchJob(): Unit = {
- builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
- case Some("SPARK") =>
- val batchSparkConf = session.sessionConf.getBatchConf("spark")
- new SparkBatchProcessBuilder(
- session.user,
- session.sessionConf,
- batchId,
- batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
- getOperationLog)
-
- case _ =>
- throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
- }
-
try {
info(s"Submitting ${batchRequest.batchType} batch job: $builder")
val process = builder.start
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 8fc79ad2c..0417f3556 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -27,7 +27,7 @@ import org.eclipse.jetty.servlet.FilterHolder
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT}
-import org.apache.kyuubi.server.api.v1.{ApiRootResource, SessionOpenRequest}
+import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.JettyServer
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
@@ -93,24 +93,24 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
super.stop()
}
- def getUserName(req: SessionOpenRequest): String = {
- val realUser: String =
- ServiceUtils.getShortName(Option(AuthenticationFilter.getUserName).getOrElse(req.user))
- if (req.configs == null) {
- realUser
- } else {
- getProxyUser(req.configs, Option(AuthenticationFilter.getUserIpAddress).orNull, realUser)
- }
+ def getUserName(sessionConf: Map[String, String]): String = {
+ val realUser: String = ServiceUtils.getShortName(
+ Option(AuthenticationFilter.getUserName).filter(_.nonEmpty).getOrElse("anonymous"))
+ getProxyUser(sessionConf, Option(AuthenticationFilter.getUserIpAddress).orNull, realUser)
}
private def getProxyUser(
sessionConf: Map[String, String],
ipAddress: String,
realUser: String): String = {
- sessionConf.get(KyuubiAuthenticationFactory.HS2_PROXY_USER).map { proxyUser =>
- KyuubiAuthenticationFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hadoopConf)
- proxyUser
- }.getOrElse(realUser)
+ if (sessionConf == null) {
+ realUser
+ } else {
+ sessionConf.get(KyuubiAuthenticationFactory.HS2_PROXY_USER).map { proxyUser =>
+ KyuubiAuthenticationFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hadoopConf)
+ proxyUser
+ }.getOrElse(realUser)
+ }
}
override val discoveryService: Option[Service] = None
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala
index 5bad5105b..c555f647e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala
@@ -55,6 +55,9 @@ private[v1] class ApiRootResource extends ApiRequestContext {
@Path("operations")
def operations: Class[OperationsResource] = classOf[OperationsResource]
+ @Path("batches")
+ def batches: Class[BatchesResource] = classOf[BatchesResource]
+
@GET
@Path("exception")
@Produces(Array(MediaType.TEXT_PLAIN))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
new file mode 100644
index 000000000..bf30bc88a
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.api.v1
+
+import javax.ws.rs.{Consumes, POST, Produces}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+import io.swagger.v3.oas.annotations.tags.Tag
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.server.api.ApiRequestContext
+import org.apache.kyuubi.server.api.v1.BatchesResource.REST_BATCH_PROTOCOL
+import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+
+@Tag(name = "Batch")
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class BatchesResource extends ApiRequestContext with Logging {
+
+ private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "create and open a batch session")
+ @POST
+ @Consumes(Array(MediaType.APPLICATION_JSON))
+ def openBatchSession(request: BatchRequest): Batch = {
+ val userName = fe.getUserName(request.conf)
+ val ipAddress = AuthenticationFilter.getUserIpAddress
+ val sessionHandle = sessionManager.openBatchSession(
+ REST_BATCH_PROTOCOL,
+ userName,
+ "anonymous",
+ ipAddress,
+ Option(request.conf).getOrElse(Map()),
+ request)
+ val batchOp = sessionManager.getSession(sessionHandle).asInstanceOf[
+ KyuubiBatchSessionImpl].batchJobSubmissionOp
+ Batch(
+ batchOp.batchId,
+ batchOp.batchType,
+ batchOp.currentApplicationState.getOrElse(Map.empty),
+ fe.connectionUrl,
+ batchOp.getStatus.state.toString)
+ }
+}
+
+object BatchesResource {
+ val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index a8acffc1b..d9f37dc86 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -132,7 +132,7 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
@POST
@Consumes(Array(MediaType.APPLICATION_JSON))
def openSession(request: SessionOpenRequest): SessionHandle = {
- val userName = fe.getUserName(request)
+ val userName = fe.getUserName(request.configs)
val ipAddress = AuthenticationFilter.getUserIpAddress
fe.be.openSession(
TProtocolVersion.findByValue(request.protocolVersion),
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 395ab3eb7..1f964c957 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -109,7 +109,6 @@ case class Field(dataType: String, value: Any)
*
* @param batchType the batch job type, such as spark, flink, etc.
* @param resource the main resource jar, required.
- * @param proxyUser the proxy user, optional.
* @param className the main class name, required.
* @param name a name of your batch job, optional.
* @param conf arbitrary configuration properties, optional.
@@ -118,8 +117,14 @@ case class Field(dataType: String, value: Any)
case class BatchRequest(
batchType: String,
resource: String,
- proxyUser: String,
className: String,
name: String,
conf: Map[String, String],
args: Seq[String])
+
+case class Batch(
+ id: String,
+ batchType: String,
+ batchInfo: Map[String, String],
+ kyuubiInstance: String,
+ state: String)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 851492cf1..4d1b4d185 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -120,7 +120,6 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
ipAddress,
conf,
this,
- // TODO: user defaults conf for batch session
this.getConf.getUserDefaults(user),
batchRequest)
try {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
index dd9507882..df47de86b 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -46,7 +46,6 @@ class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
val batchRequest = BatchRequest(
"spark",
sparkProcessBuilder.mainResource.get,
- "kyuubi",
sparkProcessBuilder.mainClass,
"spark-batch-submission",
Map(
@@ -57,7 +56,7 @@ class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
val sessionHandle = sessionManager().openBatchSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
- batchRequest.proxyUser,
+ "kyuubi",
"passwd",
"localhost",
batchRequest.conf,
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
new file mode 100644
index 000000000..cabf3e1bb
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.api.v1
+
+import javax.ws.rs.client.Entity
+import javax.ws.rs.core.MediaType
+
+import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SPARK_MAX_LIFETIME}
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
+
+class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
+ test("open batch session") {
+ val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+ val requestObj = BatchRequest(
+ "spark",
+ sparkProcessBuilder.mainResource.get,
+ sparkProcessBuilder.mainClass,
+ "spark-batch-submission",
+ Map(
+ "spark.master" -> "local",
+ s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+ s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+ Seq.empty[String])
+
+ val response = webTarget.path("api/v1/batches")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+ assert(200 == response.getStatus)
+
+ val batch = response.readEntity(classOf[Batch])
+ assert(batch.kyuubiInstance === fe.connectionUrl)
+
+ val requestObj2 = requestObj.copy(conf = requestObj.conf ++
+ Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root"))
+ val response2 = webTarget.path("api/v1/batches")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
+
+ assert(500 == response2.getStatus)
+ }
+}