You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/01/21 00:38:05 UTC
[kyuubi] branch master updated: [KYUUBI #4106] Introduce resource file uploading in batch creation via REST API
This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 609071d11 [KYUUBI #4106] Introduce resource file uploading in batch creation via REST API
609071d11 is described below
commit 609071d110a7e54efc48b2273a3a4ebaf7097c4a
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Sat Jan 21 08:37:46 2023 +0800
[KYUUBI #4106] Introduce resource file uploading in batch creation via REST API
### _Why are the changes needed?_
to close #4106 .
1. add `POST /batches`API in `BatchesResource` of REST API, which consumes `multipart/form-data` media type to support uploading `resourcefile`. And a `batchRequest` form data part in the JSON format string of `BatchRequest`, as in the required request body of `POST /batches`
2. the uploaded `resourceFile` is saved to a temp local file which will be cleaned up after job execution at the end of `submitAndMonitorBatchJob`
3. the local temp copy of `resourceFile` will be used as `resource` in `BatchJobSubmission`, eg. as <application-jar> for spark-submit
Todos in follow-up:
1. add a related description in Rest API doc
4. add `multipart` media type support to `RestClient` and implement `createBatchWithUploadingResource` in `BatchRestApi`
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4144 from bowenliang123/rest-batch-upload.
Closes #4106
f6723a02 [Bowen Liang] Merge branch 'master' into rest-batch-upload
0dd67245 [liangbowen] correct dependencyList
6365a0cd [liangbowen] introducing resource file upload support in batch creation
Lead-authored-by: liangbowen <li...@gf.com.cn>
Co-authored-by: Bowen Liang <bo...@apache.org>
Signed-off-by: liangbowen <li...@gf.com.cn>
---
dev/dependencyList | 2 +
.../src/main/scala/org/apache/kyuubi/Utils.scala | 70 +++++++++++++++++++++-
.../apache/kyuubi/config/KyuubiReservedKeys.scala | 1 +
kyuubi-server/pom.xml | 5 ++
.../kyuubi/engine/KyuubiApplicationManager.scala | 22 +++++++
.../kyuubi/operation/BatchJobSubmission.scala | 5 +-
.../apache/kyuubi/server/api/OpenAPIConfig.scala | 2 +
.../kyuubi/server/api/v1/BatchesResource.scala | 38 +++++++++++-
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 9 ++-
.../org/apache/kyuubi/RestFrontendTestHelper.scala | 3 +
.../server/api/v1/BatchesResourceSuite.scala | 22 +++++++
pom.xml | 13 ++++
12 files changed, 186 insertions(+), 6 deletions(-)
diff --git a/dev/dependencyList b/dev/dependencyList
index 6d7387b55..cce193b6f 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -86,6 +86,7 @@ jersey-container-servlet-core/2.38//jersey-container-servlet-core-2.38.jar
jersey-entity-filtering/2.38//jersey-entity-filtering-2.38.jar
jersey-hk2/2.38//jersey-hk2-2.38.jar
jersey-media-json-jackson/2.38//jersey-media-json-jackson-2.38.jar
+jersey-media-multipart/2.38//jersey-media-multipart-2.38.jar
jersey-server/2.38//jersey-server-2.38.jar
jetcd-api/0.7.3//jetcd-api-0.7.3.jar
jetcd-common/0.7.3//jetcd-common-0.7.3.jar
@@ -132,6 +133,7 @@ metrics-core/4.2.8//metrics-core-4.2.8.jar
metrics-jmx/4.2.8//metrics-jmx-4.2.8.jar
metrics-json/4.2.8//metrics-json-4.2.8.jar
metrics-jvm/4.2.8//metrics-jvm-4.2.8.jar
+mimepull/1.9.15//mimepull-1.9.15.jar
netty-all/4.1.87.Final//netty-all-4.1.87.Final.jar
netty-buffer/4.1.87.Final//netty-buffer-4.1.87.Final.jar
netty-codec-dns/4.1.87.Final//netty-codec-dns-4.1.87.Final.jar
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 33a4e116e..7283ea040 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -20,8 +20,10 @@ package org.apache.kyuubi
import java.io._
import java.net.{Inet4Address, InetAddress, NetworkInterface}
import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Path, Paths}
-import java.util.{Properties, TimeZone, UUID}
+import java.nio.file.{Files, Path, Paths, StandardCopyOption}
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties, TimeZone, UUID}
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.sys.process._
@@ -40,6 +42,12 @@ object Utils extends Logging {
import org.apache.kyuubi.config.KyuubiConf._
+ /**
+ * An atomic counter used in writeToTempFile method
+ * avoiding duplication in temporary file name generation
+ */
+ private lazy val tempFileIdCounter: AtomicLong = new AtomicLong(0)
+
def strToSeq(s: String, sp: String = ","): Seq[String] = {
require(s != null)
s.split(sp).map(_.trim).filter(_.nonEmpty)
@@ -135,6 +143,20 @@ object Utils extends Logging {
f.delete()
}
+ /**
+ * delete file in path with logging
+ * @param filePath path to file for deletion
+ * @param errorMessage message as prefix logging with error exception
+ */
+ def deleteFile(filePath: String, errorMessage: String): Unit = {
+ try {
+ Files.delete(Paths.get(filePath))
+ } catch {
+ case e: Exception =>
+ error(s"$errorMessage: $filePath ", e)
+ }
+ }
+
/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
@@ -147,6 +169,50 @@ object Utils extends Logging {
dir
}
+ /**
+ * Copies bytes from an InputStream source to a newly created temporary file
+ * created in the directory destination. The temporary file will be created
+ * with new name by adding random identifiers before original file name's suffix,
+ * and the file will be deleted on JVM exit. The directories up to destination
+ * will be created if they don't already exist. destination will be overwritten
+ * if it already exists. The source stream is closed.
+ * @param source the InputStream to copy bytes from, must not be null, will be closed
+ * @param dir the directory path for temp file creation
+ * @param fileName original file name with suffix
+ * @return the created temp file in dir
+ */
+ def writeToTempFile(source: InputStream, dir: Path, fileName: String): File = {
+ try {
+ if (source == null) {
+ throw new IOException("the source inputstream is null")
+ }
+ if (!dir.toFile.exists()) {
+ dir.toFile.mkdirs()
+ }
+ val (prefix, suffix) = fileName.lastIndexOf(".") match {
+ case i if i > 0 => (fileName.substring(0, i), fileName.substring(i))
+ case _ => (fileName, "")
+ }
+ val currentTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())
+ val identifier = s"$currentTime-${tempFileIdCounter.incrementAndGet()}"
+ val filePath = Paths.get(dir.toString, s"$prefix-$identifier$suffix")
+ try {
+ Files.copy(source, filePath, StandardCopyOption.REPLACE_EXISTING)
+ } finally {
+ source.close()
+ }
+ val file = filePath.toFile
+ file.deleteOnExit()
+ file
+ } catch {
+ case e: Exception =>
+ error(
+ s"failed to write to temp file in path $dir, original file name: $fileName",
+ e)
+ throw e
+ }
+ }
+
def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 50dae6275..6036af855 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -24,6 +24,7 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_SIGN_PUBLICKEY = "kyuubi.session.sign.publickey"
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
+ final val KYUUBI_SESSION_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.session.batch.resource.uploaded"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 748d8d028..7a7c609a2 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -216,6 +216,11 @@
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 481d7a2f1..b76f08833 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine
import java.io.File
import java.net.{URI, URISyntaxException}
+import java.nio.file.{Files, Path}
import java.util.{Locale, ServiceLoader}
import scala.collection.JavaConverters._
@@ -58,6 +59,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
case NonFatal(e) => warn(s"Error stopping ${op.getClass.getSimpleName}: ${e.getMessage}")
}
}
+ deleteTempDirForUpload()
super.stop()
}
@@ -90,6 +92,17 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
case None => None
}
}
+
+ private def deleteTempDirForUpload(): Unit = {
+ try {
+ Utils.deleteDirectoryRecursively(KyuubiApplicationManager.tempDirForUpload.toFile)
+ } catch {
+ case e: Exception => error(
+ "Failed to delete temporary folder for uploading " +
+ s"${KyuubiApplicationManager.tempDirForUpload}",
+ e)
+ }
+ }
}
object KyuubiApplicationManager {
@@ -109,6 +122,15 @@ object KyuubiApplicationManager {
conf.set(FlinkProcessBuilder.TAG_KEY, newTag)
}
+ lazy val tempDirForUpload: Path = {
+ val path = Utils.getAbsolutePathFromWork("upload")
+ val pathFile = path.toFile
+ if (!pathFile.exists()) {
+ Files.createDirectories(path)
+ }
+ path
+ }
+
private[kyuubi] def checkApplicationAccessPath(path: String, conf: KyuubiConf): Unit = {
val localDirAllowList = conf.get(KyuubiConf.SESSION_LOCAL_DIR_ALLOW_LIST)
if (localDirAllowList.nonEmpty) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index c1bcf6cec..4436926db 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -25,7 +25,7 @@ import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift._
-import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@@ -267,6 +267,9 @@ class BatchJobSubmission(
}
} finally {
builder.close()
+ if (session.isResourceUploaded) {
+ Utils.deleteFile(resource, "Failed to delete temporary uploaded resource file")
+ }
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala
index c4733a0b0..d8b489656 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.server.api
+import org.glassfish.jersey.media.multipart.MultiPartFeature
import org.glassfish.jersey.server.ResourceConfig
import org.apache.kyuubi.server.api.v1.KyuubiOpenApiResource
@@ -26,4 +27,5 @@ class OpenAPIConfig extends ResourceConfig {
register(classOf[KyuubiOpenApiResource])
register(classOf[KyuubiScalaObjectMapper])
register(classOf[RestExceptionMapper])
+ register(classOf[MultiPartFeature])
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index c00fb95f6..2c9c7535d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.server.api.v1
+import java.io.InputStream
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
@@ -29,13 +30,14 @@ import scala.util.control.NonFatal
import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
+import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataParam}
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys._
-import org.apache.kyuubi.engine.ApplicationInfo
+import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -161,6 +163,39 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@POST
@Consumes(Array(MediaType.APPLICATION_JSON))
def openBatchSession(request: BatchRequest): Batch = {
+ openBatchSessionInternal(request)
+ }
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ schema = new Schema(implementation = classOf[Batch]))),
+ description = "create and open a batch session with uploading resource file")
+ @POST
+ @Consumes(Array(MediaType.MULTIPART_FORM_DATA))
+ def openBatchSessionWithUpload(
+ @FormDataParam("batchRequest") batchRequest: BatchRequest,
+ @FormDataParam("resourceFile") resourceFileInputStream: InputStream,
+ @FormDataParam("resourceFile") resourceFileMetadata: FormDataContentDisposition): Batch = {
+ val tempFile = Utils.writeToTempFile(
+ resourceFileInputStream,
+ KyuubiApplicationManager.tempDirForUpload,
+ resourceFileMetadata.getFileName)
+ batchRequest.setResource(tempFile.getPath)
+ openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
+ }
+
+ /**
+ * open new batch session with request
+ *
+ * @param request instance of BatchRequest
+ * @param isResourceFromUpload whether to clean up temporary uploaded resource file
+ * in local path after execution
+ */
+ private def openBatchSessionInternal(
+ request: BatchRequest,
+ isResourceFromUpload: Boolean = false): Batch = {
require(
supportedBatchType(request.getBatchType),
s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
@@ -177,6 +212,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
KYUUBI_CLIENT_IP_KEY -> ipAddress,
KYUUBI_SERVER_IP_KEY -> fe.host,
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
+ KYUUBI_SESSION_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
val sessionHandle = sessionManager.openBatchSession(
userName,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 967397c95..2e718f6bf 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -25,7 +25,7 @@ import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.client.api.v1.dto.BatchRequest
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -80,6 +80,10 @@ class KyuubiBatchSessionImpl(
override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
normalizedConf.get(KyuubiConf.SESSION_NAME.key))
+ // whether the resource file is from uploading
+ private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
+ .getOrDefault(KyuubiReservedKeys.KYUUBI_SESSION_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
+
private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
.newBatchJobSubmissionOperation(
this,
@@ -116,7 +120,8 @@ class KyuubiBatchSessionImpl(
batchRequest.getBatchType,
normalizedConf,
sessionManager.getConf)
- if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE) {
+ if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
+ && !isResourceUploaded) {
KyuubiApplicationManager.checkApplicationAccessPath(
batchRequest.getResource,
sessionManager.getConf)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
index c081185d8..b22783771 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
@@ -22,6 +22,7 @@ import javax.ws.rs.client.WebTarget
import javax.ws.rs.core.{Application, Response, UriBuilder}
import org.glassfish.jersey.client.ClientConfig
+import org.glassfish.jersey.media.multipart.MultiPartFeature
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.test.JerseyTest
import org.glassfish.jersey.test.jetty.JettyTestContainerFactory
@@ -39,9 +40,11 @@ object RestFrontendTestHelper {
private class RestApiBaseSuite extends JerseyTest {
override def configure: Application = new ResourceConfig(getClass)
+ .register(classOf[MultiPartFeature])
override def configureClient(config: ClientConfig): Unit = {
config.register(classOf[KyuubiScalaObjectMapper])
+ .register(classOf[MultiPartFeature])
}
override def getTestContainerFactory: TestContainerFactory = new JettyTestContainerFactory
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 83c60878a..7c06063a6 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -28,6 +28,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.DurationInt
import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.glassfish.jersey.media.multipart.FormDataMultiPart
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart
import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.client.api.v1.dto._
@@ -199,6 +201,26 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
assert(!deleteBatchResponse.readEntity(classOf[CloseBatchResponse]).isSuccess)
}
+ test("open batch session with uploading resource") {
+ val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
+ val exampleJarFile = Paths.get(sparkBatchTestResource.get).toFile
+ val multipart = new FormDataMultiPart()
+ .field("batchRequest", requestObj, MediaType.APPLICATION_JSON_TYPE)
+ .bodyPart(new FileDataBodyPart("resourceFile", exampleJarFile))
+ .asInstanceOf[FormDataMultiPart]
+
+ val response = webTarget.path("api/v1/batches")
+ .request(MediaType.APPLICATION_JSON)
+ .post(Entity.entity(multipart, MediaType.MULTIPART_FORM_DATA))
+ assert(200 == response.getStatus)
+ val batch = response.readEntity(classOf[Batch])
+ assert(batch.getKyuubiInstance === fe.connectionUrl)
+ assert(batch.getBatchType === "SPARK")
+ assert(batch.getName === sparkBatchTestAppName)
+ assert(batch.getCreateTime > 0)
+ assert(batch.getEndTime === 0)
+ }
+
test("get batch session list") {
val sessionManager = server.frontendServices.head
.be.sessionManager.asInstanceOf[KyuubiSessionManager]
diff --git a/pom.xml b/pom.xml
index af19af136..cac5f4f1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1465,6 +1465,19 @@
<version>${jersey.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ <version>${jersey.version}</version>
+ <exclusions>
+ <!--todo: remove this exclusion when Jersey 2.39 released with scope fix for Junit dependency-->
+ <exclusion>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>