You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/02 09:43:03 UTC

[kyuubi] branch master updated: [KYUUBI #4390] Allow user to provide batch id on submitting batch job

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new efbaaff6f [KYUUBI #4390] Allow user to provide batch id on submitting batch job
efbaaff6f is described below

commit efbaaff6fbe0a11a38e0cb13421175f92bb0bc45
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Thu Mar 2 17:42:52 2023 +0800

    [KYUUBI #4390] Allow user to provide batch id on submitting batch job
    
    ### _Why are the changes needed?_
    
    This PR proposes to allow the user to provide a batch id on submitting a batch job. If the batch id already existed in metastore, Kyuubi ignores this submission and just returns the existing one, w/ a marker in response, this could avoid duplicated batch job submission.
    
    Talking about the implementation, the key things are
    
    How does the user set the custom batch id?
    
    - User can optionally set the `kyuubi.batch.id` in `conf: Map[String, String]`, and the value must be a UUID, for Java users, it can be generated by `UUID.randomUUID().toString()`
    
    How does the Kyuubi Server detect the duplication?
    
    - It's simple in single Kyuubi Server instance case, Kyuubi just needs to look up the metastore before creating a batch job
    - In HA mode, suppose the user requests to create the batch jobs w/ the same batch id concurrently, multiple Kyuubi Servers may process the request and try to insert to metastore DB at the same time, but only the first insertion success, others will fail w/ "duplicated key", Kyuubi Server needs to catch this error and return the existing batch job information instead of creating a new one.
    
    How does the user know if the returned batch job is new created or duplicated?
    
    - a new field `batchInfo: Map[String, String]` is added to the response, and for duplicated batch job, `"kyuubi.batch.duplicated": "true"` will be contained.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4390 from pan3793/batch-id.
    
    Closes #4390
    
    b6917babf [Cheng Pan] move constant to rest client
    79ef1b5d8 [Cheng Pan] flaky test
    f82228506 [Cheng Pan] it
    88bdfa50a [Cheng Pan] ut
    fd8bc222a [Cheng Pan] ut
    c820f5e43 [Cheng Pan] Support user provided batch id on batch job submission
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../test/spark/SparkOnKubernetesTestsSuite.scala   | 11 +++-
 .../scala/org/apache/kyuubi/util/JdbcUtils.scala   |  8 +++
 .../org/apache/kyuubi/client/api/v1/dto/Batch.java | 18 ++++-
 .../kyuubi/client/api/v1/dto/BatchRequest.java     |  6 +-
 .../org/apache/kyuubi/client/util/BatchUtils.java  |  9 +++
 .../apache/kyuubi/client/RestClientTestUtils.java  | 53 +++++++--------
 .../kyuubi/server/api/v1/BatchesResource.scala     | 76 ++++++++++++++++------
 .../kyuubi/server/metadata/MetadataManager.scala   | 14 ++--
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    | 13 ++--
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 15 ++++-
 .../ServerJsonLoggingEventHandlerSuite.scala       | 24 ++++---
 .../server/api/v1/BatchesResourceSuite.scala       | 34 ++++++++--
 .../kyuubi/server/rest/client/BatchCliSuite.scala  |  8 ++-
 13 files changed, 200 insertions(+), 89 deletions(-)

diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 019de840d..14db8b408 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -17,13 +17,16 @@
 
 package org.apache.kyuubi.kubernetes.test.spark
 
+import java.util.UUID
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.net.NetUtils
 
-import org.apache.kyuubi.{BatchTestHelper, KyuubiException, Logging, Utils, WithKyuubiServer, WithSimpleDFSService}
+import org.apache.kyuubi._
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_HOST
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation}
@@ -134,7 +137,8 @@ class KyuubiOperationKubernetesClusterClientModeSuite
     server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
 
   test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
-    val batchRequest = newSparkBatchRequest(conf.getAll)
+    val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
+      KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
 
     val sessionHandle = sessionManager.openBatchSession(
       "kyuubi",
@@ -193,7 +197,8 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
       "spark.kubernetes.driver.pod.name",
       driverPodNamePrefix + "-" + System.currentTimeMillis())
 
-    val batchRequest = newSparkBatchRequest(conf.getAll)
+    val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
+      KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
 
     val sessionHandle = sessionManager.openBatchSession(
       "runner",
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/JdbcUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/JdbcUtils.scala
index df72ee339..b89580f4c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/JdbcUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/JdbcUtils.scala
@@ -104,4 +104,12 @@ object JdbcUtils extends Logging {
       case _ => "(empty)"
     }
   }
+
+  def isDuplicatedKeyDBErr(cause: Throwable): Boolean = {
+    val duplicatedKeyKeywords = Seq(
+      "duplicate key value in a unique or primary key constraint or unique index", // Derby
+      "Duplicate entry" // MySQL
+    )
+    duplicatedKeyKeywords.exists(cause.getMessage.contains)
+  }
 }
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java
index 43fbf10af..b318b709d 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.client.api.v1.dto;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -35,6 +37,7 @@ public class Batch {
   private String state;
   private long createTime;
   private long endTime;
+  private Map<String, String> batchInfo = Collections.emptyMap();
 
   public Batch() {}
 
@@ -51,7 +54,8 @@ public class Batch {
       String kyuubiInstance,
       String state,
       long createTime,
-      long endTime) {
+      long endTime,
+      Map<String, String> batchInfo) {
     this.id = id;
     this.user = user;
     this.batchType = batchType;
@@ -65,6 +69,7 @@ public class Batch {
     this.state = state;
     this.createTime = createTime;
     this.endTime = endTime;
+    this.batchInfo = batchInfo;
   }
 
   public String getId() {
@@ -171,6 +176,17 @@ public class Batch {
     this.endTime = endTime;
   }
 
+  public Map<String, String> getBatchInfo() {
+    if (batchInfo == null) {
+      return Collections.emptyMap();
+    }
+    return batchInfo;
+  }
+
+  public void setBatchInfo(Map<String, String> batchInfo) {
+    this.batchInfo = batchInfo;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
index f10a8fdb5..f45821fc2 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
@@ -29,8 +29,8 @@ public class BatchRequest {
   private String resource;
   private String className;
   private String name;
-  private Map<String, String> conf;
-  private List<String> args;
+  private Map<String, String> conf = Collections.emptyMap();
+  private List<String> args = Collections.emptyList();
 
   public BatchRequest() {}
 
@@ -54,8 +54,6 @@ public class BatchRequest {
     this.resource = resource;
     this.className = className;
     this.name = name;
-    this.conf = Collections.emptyMap();
-    this.args = Collections.emptyList();
   }
 
   public String getBatchType() {
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/BatchUtils.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/BatchUtils.java
index 59f5967a0..f7efaad9d 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/BatchUtils.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/BatchUtils.java
@@ -20,6 +20,7 @@ package org.apache.kyuubi.client.util;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import org.apache.kyuubi.client.api.v1.dto.Batch;
 
 public final class BatchUtils {
   /** The batch has not been submitted to resource manager yet. */
@@ -40,6 +41,10 @@ public final class BatchUtils {
   public static List<String> terminalBatchStates =
       Arrays.asList(FINISHED_STATE, ERROR_STATE, CANCELED_STATE);
 
+  public static String KYUUBI_BATCH_ID_KEY = "kyuubi.batch.id";
+
+  public static String KYUUBI_BATCH_DUPLICATED_KEY = "kyuubi.batch.duplicated";
+
   public static boolean isPendingState(String state) {
     return PENDING_STATE.equalsIgnoreCase(state);
   }
@@ -55,4 +60,8 @@ public final class BatchUtils {
   public static boolean isTerminalState(String state) {
     return state != null && terminalBatchStates.contains(state.toUpperCase(Locale.ROOT));
   }
+
+  public static boolean isDuplicatedSubmission(Batch batch) {
+    return "true".equalsIgnoreCase(batch.getBatchInfo().get(KYUUBI_BATCH_DUPLICATED_KEY));
+  }
 }
diff --git a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java
index 82413e2a4..1ac0278bf 100644
--- a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java
+++ b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java
@@ -45,35 +45,31 @@ public class RestClientTestUtils {
   }
 
   public static Batch generateTestBatch(String id) {
-    Batch batch =
-        new Batch(
-            id,
-            TEST_USERNAME,
-            "spark",
-            "batch_name",
-            0,
-            id,
-            null,
-            "RUNNING",
-            null,
-            "192.168.31.130:64573",
-            "RUNNING",
-            BATCH_CREATE_TIME,
-            0);
-
-    return batch;
+    return new Batch(
+        id,
+        TEST_USERNAME,
+        "spark",
+        "batch_name",
+        0,
+        id,
+        null,
+        "RUNNING",
+        null,
+        "192.168.31.130:64573",
+        "RUNNING",
+        BATCH_CREATE_TIME,
+        0,
+        Collections.emptyMap());
   }
 
   public static BatchRequest generateTestBatchRequest() {
-    BatchRequest batchRequest =
-        new BatchRequest(
-            "spark",
-            "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
-            "org.apache.kyuubi.engine.spark.SparkSQLEngine",
-            "test_batch",
-            Collections.singletonMap("spark.driver.memory", "16m"),
-            Collections.emptyList());
-    return batchRequest;
+    return new BatchRequest(
+        "spark",
+        "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
+        "org.apache.kyuubi.engine.spark.SparkSQLEngine",
+        "test_batch",
+        Collections.singletonMap("spark.driver.memory", "16m"),
+        Collections.emptyList());
   }
 
   public static GetBatchesResponse generateTestBatchesResponse() {
@@ -87,9 +83,8 @@ public class RestClientTestUtils {
   public static OperationLog generateTestOperationLog() {
     List<String> logs =
         Arrays.asList(
-            "13:15:13.523 INFO org.apache.curator.framework.state."
-                + "ConnectionStateManager: State change: CONNECTED",
-            "13:15:13.528 INFO org.apache.kyuubi." + "engine.EngineRef: Launching engine:");
+            "13:15:13.523 INFO ConnectionStateManager: State change: CONNECTED",
+            "13:15:13.528 INFO EngineRef: Launching engine:");
     return new OperationLog(logs, 2);
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 924e7b89c..edfc05616 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -19,13 +19,14 @@ package org.apache.kyuubi.server.api.v1
 
 import java.io.InputStream
 import java.util
-import java.util.{Collections, Locale}
+import java.util.{Collections, Locale, UUID}
 import java.util.concurrent.ConcurrentHashMap
 import javax.ws.rs._
 import javax.ws.rs.core.MediaType
 import javax.ws.rs.core.Response.Status
 
 import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
 import scala.util.control.NonFatal
 
 import io.swagger.v3.oas.annotations.media.{Content, Schema}
@@ -36,6 +37,7 @@ import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDat
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.client.exception.KyuubiRestException
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiReservedKeys._
 import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
@@ -45,6 +47,7 @@ import org.apache.kyuubi.server.api.v1.BatchesResource._
 import org.apache.kyuubi.server.metadata.MetadataManager
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle}
+import org.apache.kyuubi.util.JdbcUtils
 
 @Tag(name = "Batch")
 @Produces(Array(MediaType.APPLICATION_JSON))
@@ -105,7 +108,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
       session.connectionUrl,
       batchOpStatus.state.toString,
       session.createTime,
-      batchOpStatus.completed)
+      batchOpStatus.completed,
+      Map.empty[String, String].asJava)
   }
 
   private def buildBatch(
@@ -142,7 +146,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
         metadata.kyuubiInstance,
         currentBatchState,
         metadata.createTime,
-        metadata.endTime)
+        metadata.endTime,
+        Map.empty[String, String].asJava)
     }.getOrElse(MetadataManager.buildBatch(metadata))
   }
 
@@ -210,22 +215,55 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
     }
     request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
 
-    val userName = fe.getSessionUser(request.getConf.asScala.toMap)
-    val ipAddress = fe.getIpAddress
-    request.setConf(
-      (request.getConf.asScala ++ Map(
-        KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
-        KYUUBI_CLIENT_IP_KEY -> ipAddress,
-        KYUUBI_SERVER_IP_KEY -> fe.host,
-        KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
-        KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
-    val sessionHandle = sessionManager.openBatchSession(
-      userName,
-      "anonymous",
-      ipAddress,
-      request.getConf.asScala.toMap,
-      request)
-    buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
+    val userProvidedBatchId = request.getConf.asScala.get(KYUUBI_BATCH_ID_KEY)
+    userProvidedBatchId.foreach { batchId =>
+      try UUID.fromString(batchId)
+      catch {
+        case NonFatal(e) =>
+          throw new IllegalArgumentException(s"$KYUUBI_BATCH_ID_KEY=$batchId must be an UUID", e)
+      }
+    }
+
+    userProvidedBatchId.flatMap { batchId =>
+      Option(sessionManager.getBatchFromMetadataStore(batchId))
+    } match {
+      case Some(batch) =>
+        markDuplicated(batch)
+      case None =>
+        val userName = fe.getSessionUser(request.getConf.asScala.toMap)
+        val ipAddress = fe.getIpAddress
+        val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
+        request.setConf(
+          (request.getConf.asScala ++ Map(
+            KYUUBI_BATCH_ID_KEY -> batchId,
+            KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
+            KYUUBI_CLIENT_IP_KEY -> ipAddress,
+            KYUUBI_SERVER_IP_KEY -> fe.host,
+            KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
+            KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
+
+        Try {
+          sessionManager.openBatchSession(
+            userName,
+            "anonymous",
+            ipAddress,
+            request.getConf.asScala.toMap,
+            request)
+        } match {
+          case Success(sessionHandle) =>
+            buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
+          case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
+            val batch = sessionManager.getBatchFromMetadataStore(batchId)
+            assert(batch != null, s"can not find duplicated batch $batchId from metadata store")
+            markDuplicated(batch)
+        }
+    }
+  }
+
+  private def markDuplicated(batch: Batch): Batch = {
+    warn(s"duplicated submission: ${batch.getId}, ignore and return the existing batch.")
+    batch.setBatchInfo(Map(KYUUBI_BATCH_DUPLICATED_KEY -> "true").asJava)
+    batch
   }
 
   @ApiResponse(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 35abc1b30..88a7f4e4e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.server.metadata
 import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConverters._
+
 import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.client.api.v1.dto.Batch
 import org.apache.kyuubi.config.KyuubiConf
@@ -29,7 +31,7 @@ import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.session.SessionType
-import org.apache.kyuubi.util.{ClassUtils, ThreadUtils}
+import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
 
 class MetadataManager extends AbstractService("MetadataManager") {
   import MetadataManager._
@@ -105,11 +107,8 @@ class MetadataManager extends AbstractService("MetadataManager") {
   }
 
   protected def unrecoverableDBErr(cause: Throwable): Boolean = {
-    val unrecoverableKeywords = Seq(
-      "duplicate key value in a unique or primary key constraint or unique index", // Derby
-      "Duplicate entry" // MySQL
-    )
-    unrecoverableKeywords.exists(cause.getMessage.contains)
+    // cover other cases in the future
+    JdbcUtils.isDuplicatedKeyDBErr(cause)
   }
 
   def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
@@ -334,6 +333,7 @@ object MetadataManager extends Logging {
       batchMetadata.kyuubiInstance,
       batchState,
       batchMetadata.createTime,
-      batchMetadata.endTime)
+      batchMetadata.endTime,
+      Map.empty[String, String].asJava)
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 7864d61f3..228890a1e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -17,13 +17,12 @@
 
 package org.apache.kyuubi.session
 
-import java.util.UUID
-
 import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.client.api.v1.dto.BatchRequest
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
@@ -50,9 +49,10 @@ class KyuubiBatchSessionImpl(
     sessionManager) {
   override val sessionType: SessionType = SessionType.BATCH
 
-  override val handle: SessionHandle = recoveryMetadata.map { metadata =>
-    SessionHandle(UUID.fromString(metadata.identifier))
-  }.getOrElse(SessionHandle())
+  override val handle: SessionHandle = {
+    val batchId = recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
+    SessionHandle.fromUUID(batchId)
+  }
 
   override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
 
@@ -105,7 +105,7 @@ class KyuubiBatchSessionImpl(
   }
 
   private val sessionEvent = KyuubiSessionEvent(this)
-  recoveryMetadata.map(metadata => sessionEvent.engineId = metadata.engineId)
+  recoveryMetadata.foreach(metadata => sessionEvent.engineId = metadata.engineId)
   EventBus.post(sessionEvent)
 
   override def getSessionEvent: Option[KyuubiSessionEvent] = {
@@ -146,6 +146,7 @@ class KyuubiBatchSessionImpl(
         engineType = batchRequest.getBatchType,
         clusterManager = batchJobSubmissionOp.builder.clusterManager())
 
+      // there is a chance that operation failed w/ duplicated key error
       sessionManager.insertMetadata(metaData)
     }
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 27e769d29..3bc6bb1c5 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -17,9 +17,12 @@
 
 package org.apache.kyuubi
 
+import java.util.UUID
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
@@ -104,7 +107,10 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
 
   test("open batch session") {
     val batchRequest =
-      newSparkBatchRequest(Map("spark.master" -> "local", "spark.executor.instances" -> "1"))
+      newSparkBatchRequest(Map(
+        "spark.master" -> "local",
+        "spark.executor.instances" -> "1",
+        KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
 
     val sessionHandle = sessionManager.openBatchSession(
       "kyuubi",
@@ -162,7 +168,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
   }
 
   test("prevent dead loop if the batch job submission process it not alive") {
-    val batchRequest = newSparkBatchRequest(Map("spark.submit.deployMode" -> "invalid"))
+    val batchRequest = newSparkBatchRequest(Map(
+      "spark.submit.deployMode" -> "invalid",
+      KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
 
     val sessionHandle = sessionManager.openBatchSession(
       "kyuubi",
@@ -188,7 +196,8 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
       "spark.submit.deployMode" -> "cluster",
       "spark.sql.defaultCatalog=spark_catalog" -> "spark_catalog",
       "spark.sql.catalog.spark_catalog.type" -> "invalid_type",
-      "kyuubi.session.engine.initialize.timeout" -> "PT10m"))(Map.empty) {
+      "kyuubi.session.engine.initialize.timeout" -> "PT10M",
+      KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))(Map.empty) {
       val startTime = System.currentTimeMillis()
       val exception = intercept[Exception] {
         withJdbcStatement() { _ => }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index df8ea1083..3bdc9cd38 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -28,8 +28,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TStatusCode}
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi._
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.OperationState._
@@ -138,7 +140,7 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
       Utils.currentUser,
       "kyuubi",
       "127.0.0.1",
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       batchRequest)
     withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
       withJdbcStatement() { statement =>
@@ -277,15 +279,17 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
       }
     }
 
-    val serverSessionEventPath =
-      Paths.get(serverLogRoot, "kyuubi_session", s"day=$currentDate")
-    withJdbcStatement() { statement =>
-      val res = statement.executeQuery(
-        s"SELECT * FROM `json`.`$serverSessionEventPath` " +
-          s"where sessionName = '$name' and exception is not null limit 1")
-      assert(res.next())
-      val exception = res.getObject("exception")
-      assert(exception.toString.contains("Invalid maximum heap size: -Xmxabc"))
+    eventually(timeout(2.minutes), interval(10.seconds)) {
+      val serverSessionEventPath =
+        Paths.get(serverLogRoot, "kyuubi_session", s"day=$currentDate")
+      withJdbcStatement() { statement =>
+        val res = statement.executeQuery(
+          s"SELECT * FROM `json`.`$serverSessionEventPath` " +
+            s"where sessionName = '$name' and exception is not null limit 1")
+        assert(res.next())
+        val exception = res.getObject("exception")
+        assert(exception.toString.contains("Invalid maximum heap size: -Xmxabc"))
+      }
     }
   }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index ce05cbd6b..055496ff3 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -33,6 +33,8 @@ import org.glassfish.jersey.media.multipart.file.FileDataBodyPart
 
 import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.client.api.v1.dto._
+import org.apache.kyuubi.client.util.BatchUtils
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
@@ -226,6 +228,30 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
     }
   }
 
+  test("open batch session w/ batch id") {
+    val batchId = UUID.randomUUID().toString
+    val reqObj = newSparkBatchRequest(Map(
+      "spark.master" -> "local",
+      KYUUBI_BATCH_ID_KEY -> batchId))
+
+    val resp1 = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(reqObj, MediaType.APPLICATION_JSON_TYPE))
+    assert(200 == resp1.getStatus)
+    val batch1 = resp1.readEntity(classOf[Batch])
+    assert(batch1.getId === batchId)
+
+    val resp2 = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(reqObj, MediaType.APPLICATION_JSON_TYPE))
+    assert(200 == resp2.getStatus)
+    val batch2 = resp2.readEntity(classOf[Batch])
+    assert(batch2.getId === batchId)
+
+    assert(batch1.getCreateTime === batch2.getCreateTime)
+    assert(BatchUtils.isDuplicatedSubmission(batch2))
+  }
+
   test("get batch session list") {
     val sessionManager = server.frontendServices.head
       .be.sessionManager.asInstanceOf[KyuubiSessionManager]
@@ -250,7 +276,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
@@ -272,7 +298,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
@@ -282,7 +308,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
@@ -672,7 +698,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
         "kyuubi",
         "kyuubi",
         InetAddress.getLocalHost.getCanonicalHostName,
-        Map.empty,
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
         newSparkBatchRequest(Map("spark.jars" -> "disAllowPath")))
     }
     val sessionHandleRegex = "\\[[\\S]*\\]".r
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 9d0a9b15a..ff807ef02 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
+import java.util.UUID
 
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.shaded.com.nimbusds.jose.util.StandardCharset
@@ -28,6 +29,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi.{BatchTestHelper, RestClientTestHelper, Utils}
+import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit}
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
@@ -256,7 +258,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         "",
@@ -278,7 +280,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         "",
@@ -288,7 +290,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map.empty,
+      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         "",