You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/01/21 00:38:05 UTC

[kyuubi] branch master updated: [KYUUBI #4106] Introduce resource file uploading in batch creation via REST API

This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 609071d11 [KYUUBI #4106] Introduce resource file uploading in batch creation via REST API
609071d11 is described below

commit 609071d110a7e54efc48b2273a3a4ebaf7097c4a
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Sat Jan 21 08:37:46 2023 +0800

    [KYUUBI #4106] Introduce resource file uploading in batch creation via REST API
    
    ### _Why are the changes needed?_
    
    to close #4106 .
    
    1. add `POST /batches`API in `BatchesResource` of REST API, which consumes `multipart/form-data` media type to support uploading `resourcefile`. And a `batchRequest` form data part in the JSON format string of `BatchRequest`, as in the required request body of `POST /batches`
    2. the uploaded `resourceFile` is saved to a temp local file which will be cleaned up after job execution at the end of  `submitAndMonitorBatchJob`
    3. the local temp copy of `resourceFile` will be used as `resource` in `BatchJobSubmission`, eg. as <application-jar> for spark-submit
    
    Todos in follow-up:
    1. add a related description in Rest API doc
    4. add `multipart` media type support to `RestClient` and implement `createBatchWithUploadingResource` in `BatchRestApi`
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4144 from bowenliang123/rest-batch-upload.
    
    Closes #4106
    
    f6723a02 [Bowen Liang] Merge branch 'master' into rest-batch-upload
    0dd67245 [liangbowen] correct dependencyList
    6365a0cd [liangbowen] introducing resource file upload support in batch creation
    
    Lead-authored-by: liangbowen <li...@gf.com.cn>
    Co-authored-by: Bowen Liang <bo...@apache.org>
    Signed-off-by: liangbowen <li...@gf.com.cn>
---
 dev/dependencyList                                 |  2 +
 .../src/main/scala/org/apache/kyuubi/Utils.scala   | 70 +++++++++++++++++++++-
 .../apache/kyuubi/config/KyuubiReservedKeys.scala  |  1 +
 kyuubi-server/pom.xml                              |  5 ++
 .../kyuubi/engine/KyuubiApplicationManager.scala   | 22 +++++++
 .../kyuubi/operation/BatchJobSubmission.scala      |  5 +-
 .../apache/kyuubi/server/api/OpenAPIConfig.scala   |  2 +
 .../kyuubi/server/api/v1/BatchesResource.scala     | 38 +++++++++++-
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    |  9 ++-
 .../org/apache/kyuubi/RestFrontendTestHelper.scala |  3 +
 .../server/api/v1/BatchesResourceSuite.scala       | 22 +++++++
 pom.xml                                            | 13 ++++
 12 files changed, 186 insertions(+), 6 deletions(-)

diff --git a/dev/dependencyList b/dev/dependencyList
index 6d7387b55..cce193b6f 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -86,6 +86,7 @@ jersey-container-servlet-core/2.38//jersey-container-servlet-core-2.38.jar
 jersey-entity-filtering/2.38//jersey-entity-filtering-2.38.jar
 jersey-hk2/2.38//jersey-hk2-2.38.jar
 jersey-media-json-jackson/2.38//jersey-media-json-jackson-2.38.jar
+jersey-media-multipart/2.38//jersey-media-multipart-2.38.jar
 jersey-server/2.38//jersey-server-2.38.jar
 jetcd-api/0.7.3//jetcd-api-0.7.3.jar
 jetcd-common/0.7.3//jetcd-common-0.7.3.jar
@@ -132,6 +133,7 @@ metrics-core/4.2.8//metrics-core-4.2.8.jar
 metrics-jmx/4.2.8//metrics-jmx-4.2.8.jar
 metrics-json/4.2.8//metrics-json-4.2.8.jar
 metrics-jvm/4.2.8//metrics-jvm-4.2.8.jar
+mimepull/1.9.15//mimepull-1.9.15.jar
 netty-all/4.1.87.Final//netty-all-4.1.87.Final.jar
 netty-buffer/4.1.87.Final//netty-buffer-4.1.87.Final.jar
 netty-codec-dns/4.1.87.Final//netty-codec-dns-4.1.87.Final.jar
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 33a4e116e..7283ea040 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -20,8 +20,10 @@ package org.apache.kyuubi
 import java.io._
 import java.net.{Inet4Address, InetAddress, NetworkInterface}
 import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Path, Paths}
-import java.util.{Properties, TimeZone, UUID}
+import java.nio.file.{Files, Path, Paths, StandardCopyOption}
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties, TimeZone, UUID}
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 import scala.sys.process._
@@ -40,6 +42,12 @@ object Utils extends Logging {
 
   import org.apache.kyuubi.config.KyuubiConf._
 
+  /**
+   * An atomic counter used in writeToTempFile method
+   * avoiding duplication in temporary file name generation
+   */
+  private lazy val tempFileIdCounter: AtomicLong = new AtomicLong(0)
+
   def strToSeq(s: String, sp: String = ","): Seq[String] = {
     require(s != null)
     s.split(sp).map(_.trim).filter(_.nonEmpty)
@@ -135,6 +143,20 @@ object Utils extends Logging {
     f.delete()
   }
 
+  /**
+   * delete file in path with logging
+   * @param filePath path to file for deletion
+   * @param errorMessage message as prefix logging with error exception
+   */
+  def deleteFile(filePath: String, errorMessage: String): Unit = {
+    try {
+      Files.delete(Paths.get(filePath))
+    } catch {
+      case e: Exception =>
+        error(s"$errorMessage: $filePath ", e)
+    }
+  }
+
   /**
    * Create a temporary directory inside the given parent directory. The directory will be
    * automatically deleted when the VM shuts down.
@@ -147,6 +169,50 @@ object Utils extends Logging {
     dir
   }
 
+  /**
+   * Copies bytes from an InputStream source to a newly created temporary file
+   * created in the directory destination. The temporary file will be created
+   * with new name by adding random identifiers before original file name's suffix,
+   * and the file will be deleted on JVM exit. The directories up to destination
+   * will be created if they don't already exist. destination will be overwritten
+   * if it already exists. The source stream is closed.
+   * @param source the InputStream to copy bytes from, must not be null, will be closed
+   * @param dir the directory path for temp file creation
+   * @param fileName original file name with suffix
+   * @return the created temp file in dir
+   */
+  def writeToTempFile(source: InputStream, dir: Path, fileName: String): File = {
+    try {
+      if (source == null) {
+        throw new IOException("the source inputstream is null")
+      }
+      if (!dir.toFile.exists()) {
+        dir.toFile.mkdirs()
+      }
+      val (prefix, suffix) = fileName.lastIndexOf(".") match {
+        case i if i > 0 => (fileName.substring(0, i), fileName.substring(i))
+        case _ => (fileName, "")
+      }
+      val currentTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())
+      val identifier = s"$currentTime-${tempFileIdCounter.incrementAndGet()}"
+      val filePath = Paths.get(dir.toString, s"$prefix-$identifier$suffix")
+      try {
+        Files.copy(source, filePath, StandardCopyOption.REPLACE_EXISTING)
+      } finally {
+        source.close()
+      }
+      val file = filePath.toFile
+      file.deleteOnExit()
+      file
+    } catch {
+      case e: Exception =>
+        error(
+          s"failed to write to temp file in path $dir, original file name: $fileName",
+          e)
+        throw e
+    }
+  }
+
   def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName
 
   private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 50dae6275..6036af855 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -24,6 +24,7 @@ object KyuubiReservedKeys {
   final val KYUUBI_SESSION_SIGN_PUBLICKEY = "kyuubi.session.sign.publickey"
   final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
   final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
+  final val KYUUBI_SESSION_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.session.batch.resource.uploaded"
   final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
   final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
   final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 748d8d028..7a7c609a2 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -216,6 +216,11 @@
             <artifactId>jersey-media-json-jackson</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-multipart</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP</artifactId>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 481d7a2f1..b76f08833 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine
 
 import java.io.File
 import java.net.{URI, URISyntaxException}
+import java.nio.file.{Files, Path}
 import java.util.{Locale, ServiceLoader}
 
 import scala.collection.JavaConverters._
@@ -58,6 +59,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
         case NonFatal(e) => warn(s"Error stopping ${op.getClass.getSimpleName}: ${e.getMessage}")
       }
     }
+    deleteTempDirForUpload()
     super.stop()
   }
 
@@ -90,6 +92,17 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
       case None => None
     }
   }
+
+  private def deleteTempDirForUpload(): Unit = {
+    try {
+      Utils.deleteDirectoryRecursively(KyuubiApplicationManager.tempDirForUpload.toFile)
+    } catch {
+      case e: Exception => error(
+          "Failed to delete temporary folder for uploading " +
+            s"${KyuubiApplicationManager.tempDirForUpload}",
+          e)
+    }
+  }
 }
 
 object KyuubiApplicationManager {
@@ -109,6 +122,15 @@ object KyuubiApplicationManager {
     conf.set(FlinkProcessBuilder.TAG_KEY, newTag)
   }
 
+  lazy val tempDirForUpload: Path = {
+    val path = Utils.getAbsolutePathFromWork("upload")
+    val pathFile = path.toFile
+    if (!pathFile.exists()) {
+      Files.createDirectories(path)
+    }
+    path
+  }
+
   private[kyuubi] def checkApplicationAccessPath(path: String, conf: KyuubiConf): Unit = {
     val localDirAllowList = conf.get(KyuubiConf.SESSION_LOCAL_DIR_ALLOW_LIST)
     if (localDirAllowList.nonEmpty) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index c1bcf6cec..4436926db 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -25,7 +25,7 @@ import com.codahale.metrics.MetricRegistry
 import com.google.common.annotations.VisibleForTesting
 import org.apache.hive.service.rpc.thrift._
 
-import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@@ -267,6 +267,9 @@ class BatchJobSubmission(
       }
     } finally {
       builder.close()
+      if (session.isResourceUploaded) {
+        Utils.deleteFile(resource, "Failed to delete temporary uploaded resource file")
+      }
     }
   }
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala
index c4733a0b0..d8b489656 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/OpenAPIConfig.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.server.api
 
+import org.glassfish.jersey.media.multipart.MultiPartFeature
 import org.glassfish.jersey.server.ResourceConfig
 
 import org.apache.kyuubi.server.api.v1.KyuubiOpenApiResource
@@ -26,4 +27,5 @@ class OpenAPIConfig extends ResourceConfig {
   register(classOf[KyuubiOpenApiResource])
   register(classOf[KyuubiScalaObjectMapper])
   register(classOf[RestExceptionMapper])
+  register(classOf[MultiPartFeature])
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index c00fb95f6..2c9c7535d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.server.api.v1
 
+import java.io.InputStream
 import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
 import javax.ws.rs._
@@ -29,13 +30,14 @@ import scala.util.control.NonFatal
 import io.swagger.v3.oas.annotations.media.{Content, Schema}
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
+import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataParam}
 
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiReservedKeys._
-import org.apache.kyuubi.engine.ApplicationInfo
+import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -161,6 +163,39 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
   @POST
   @Consumes(Array(MediaType.APPLICATION_JSON))
   def openBatchSession(request: BatchRequest): Batch = {
+    openBatchSessionInternal(request)
+  }
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[Batch]))),
+    description = "create and open a batch session with uploading resource file")
+  @POST
+  @Consumes(Array(MediaType.MULTIPART_FORM_DATA))
+  def openBatchSessionWithUpload(
+      @FormDataParam("batchRequest") batchRequest: BatchRequest,
+      @FormDataParam("resourceFile") resourceFileInputStream: InputStream,
+      @FormDataParam("resourceFile") resourceFileMetadata: FormDataContentDisposition): Batch = {
+    val tempFile = Utils.writeToTempFile(
+      resourceFileInputStream,
+      KyuubiApplicationManager.tempDirForUpload,
+      resourceFileMetadata.getFileName)
+    batchRequest.setResource(tempFile.getPath)
+    openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
+  }
+
+  /**
+   * open new batch session with request
+   *
+   * @param request              instance of BatchRequest
+   * @param isResourceFromUpload whether to clean up temporary uploaded resource file
+   *                             in local path after execution
+   */
+  private def openBatchSessionInternal(
+      request: BatchRequest,
+      isResourceFromUpload: Boolean = false): Batch = {
     require(
       supportedBatchType(request.getBatchType),
       s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
@@ -177,6 +212,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
         KYUUBI_CLIENT_IP_KEY -> ipAddress,
         KYUUBI_SERVER_IP_KEY -> fe.host,
         KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
+        KYUUBI_SESSION_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
         KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
     val sessionHandle = sessionManager.openBatchSession(
       userName,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 967397c95..2e718f6bf 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -25,7 +25,7 @@ import com.codahale.metrics.MetricRegistry
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.client.api.v1.dto.BatchRequest
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -80,6 +80,10 @@ class KyuubiBatchSessionImpl(
   override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
     normalizedConf.get(KyuubiConf.SESSION_NAME.key))
 
+  // whether the resource file is from uploading
+  private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
+    .getOrDefault(KyuubiReservedKeys.KYUUBI_SESSION_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
+
   private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
     .newBatchJobSubmissionOperation(
       this,
@@ -116,7 +120,8 @@ class KyuubiBatchSessionImpl(
       batchRequest.getBatchType,
       normalizedConf,
       sessionManager.getConf)
-    if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE) {
+    if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
+      && !isResourceUploaded) {
       KyuubiApplicationManager.checkApplicationAccessPath(
         batchRequest.getResource,
         sessionManager.getConf)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
index c081185d8..b22783771 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
@@ -22,6 +22,7 @@ import javax.ws.rs.client.WebTarget
 import javax.ws.rs.core.{Application, Response, UriBuilder}
 
 import org.glassfish.jersey.client.ClientConfig
+import org.glassfish.jersey.media.multipart.MultiPartFeature
 import org.glassfish.jersey.server.ResourceConfig
 import org.glassfish.jersey.test.JerseyTest
 import org.glassfish.jersey.test.jetty.JettyTestContainerFactory
@@ -39,9 +40,11 @@ object RestFrontendTestHelper {
   private class RestApiBaseSuite extends JerseyTest {
 
     override def configure: Application = new ResourceConfig(getClass)
+      .register(classOf[MultiPartFeature])
 
     override def configureClient(config: ClientConfig): Unit = {
       config.register(classOf[KyuubiScalaObjectMapper])
+        .register(classOf[MultiPartFeature])
     }
 
     override def getTestContainerFactory: TestContainerFactory = new JettyTestContainerFactory
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 83c60878a..7c06063a6 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -28,6 +28,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration.DurationInt
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.glassfish.jersey.media.multipart.FormDataMultiPart
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart
 
 import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.client.api.v1.dto._
@@ -199,6 +201,26 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
     assert(!deleteBatchResponse.readEntity(classOf[CloseBatchResponse]).isSuccess)
   }
 
+  test("open batch session with uploading resource") {
+    val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
+    val exampleJarFile = Paths.get(sparkBatchTestResource.get).toFile
+    val multipart = new FormDataMultiPart()
+      .field("batchRequest", requestObj, MediaType.APPLICATION_JSON_TYPE)
+      .bodyPart(new FileDataBodyPart("resourceFile", exampleJarFile))
+      .asInstanceOf[FormDataMultiPart]
+
+    val response = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON)
+      .post(Entity.entity(multipart, MediaType.MULTIPART_FORM_DATA))
+    assert(200 == response.getStatus)
+    val batch = response.readEntity(classOf[Batch])
+    assert(batch.getKyuubiInstance === fe.connectionUrl)
+    assert(batch.getBatchType === "SPARK")
+    assert(batch.getName === sparkBatchTestAppName)
+    assert(batch.getCreateTime > 0)
+    assert(batch.getEndTime === 0)
+  }
+
   test("get batch session list") {
     val sessionManager = server.frontendServices.head
       .be.sessionManager.asInstanceOf[KyuubiSessionManager]
diff --git a/pom.xml b/pom.xml
index af19af136..cac5f4f1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1465,6 +1465,19 @@
                 <version>${jersey.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.glassfish.jersey.media</groupId>
+                <artifactId>jersey-media-multipart</artifactId>
+                <version>${jersey.version}</version>
+                <exclusions>
+                    <!--todo: remove this exclusion when Jersey 2.39 released with scope fix for Junit dependency-->
+                    <exclusion>
+                        <groupId>org.junit.jupiter</groupId>
+                        <artifactId>junit-jupiter</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>org.glassfish.jersey.test-framework</groupId>
                 <artifactId>jersey-test-framework-core</artifactId>