You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/03 08:17:21 UTC

[kyuubi] branch master updated: [KYUUBI #4144][FOLLOWUP] Do not cleanup upload dir because of batch recovery and fix temp file leak

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new eef6947cc [KYUUBI #4144][FOLLOWUP] Do not cleanup upload dir because of batch recovery and fix temp file leak
eef6947cc is described below

commit eef6947ccab41570676c7fe0a3d8cd71ac8816fb
Author: fwang12 <fw...@ebay.com>
AuthorDate: Fri Feb 3 16:17:11 2023 +0800

    [KYUUBI #4144][FOLLOWUP] Do not cleanup upload dir because of batch recovery and fix temp file leak
    
    ### _Why are the changes needed?_
    
    Address comments: https://github.com/apache/kyuubi/pull/4144#issuecomment-1412078077
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4231 from turboFei/uploading_resource_followup.
    
    Closes #4144
    
    93ccd9534 [fwang12] comments
    81a224dce [fwang12] ut
    6b226969a [fwang12] prevent temp file leak
    17a4e394c [fwang12] Do not remove upload dir
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../src/main/scala/org/apache/kyuubi/Utils.scala      | 14 --------------
 .../kyuubi/engine/KyuubiApplicationManager.scala      | 14 +-------------
 .../apache/kyuubi/operation/BatchJobSubmission.scala  | 19 +++++++++++++++----
 .../apache/kyuubi/server/api/v1/BatchesResource.scala |  2 +-
 .../kyuubi/server/api/v1/BatchesResourceSuite.scala   |  8 +++++++-
 5 files changed, 24 insertions(+), 33 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 7283ea040..7ab312fa1 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -143,20 +143,6 @@ object Utils extends Logging {
     f.delete()
   }
 
-  /**
-   * delete file in path with logging
-   * @param filePath path to file for deletion
-   * @param errorMessage message as prefix logging with error exception
-   */
-  def deleteFile(filePath: String, errorMessage: String): Unit = {
-    try {
-      Files.delete(Paths.get(filePath))
-    } catch {
-      case e: Exception =>
-        error(s"$errorMessage: $filePath ", e)
-    }
-  }
-
   /**
    * Create a temporary directory inside the given parent directory. The directory will be
    * automatically deleted when the VM shuts down.
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index b76f08833..70c130012 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -59,7 +59,6 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
         case NonFatal(e) => warn(s"Error stopping ${op.getClass.getSimpleName}: ${e.getMessage}")
       }
     }
-    deleteTempDirForUpload()
     super.stop()
   }
 
@@ -92,17 +91,6 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
       case None => None
     }
   }
-
-  private def deleteTempDirForUpload(): Unit = {
-    try {
-      Utils.deleteDirectoryRecursively(KyuubiApplicationManager.tempDirForUpload.toFile)
-    } catch {
-      case e: Exception => error(
-          "Failed to delete temporary folder for uploading " +
-            s"${KyuubiApplicationManager.tempDirForUpload}",
-          e)
-    }
-  }
 }
 
 object KyuubiApplicationManager {
@@ -122,7 +110,7 @@ object KyuubiApplicationManager {
     conf.set(FlinkProcessBuilder.TAG_KEY, newTag)
   }
 
-  lazy val tempDirForUpload: Path = {
+  val uploadWorkDir: Path = {
     val path = Utils.getAbsolutePathFromWork("upload")
     val pathFile = path.toFile
     if (!pathFile.exists()) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 4436926db..f061d977d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.operation
 
 import java.io.IOException
+import java.nio.file.{Files, Paths}
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -25,7 +26,7 @@ import com.codahale.metrics.MetricRegistry
 import com.google.common.annotations.VisibleForTesting
 import org.apache.hive.service.rpc.thrift._
 
-import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@@ -267,9 +268,7 @@ class BatchJobSubmission(
       }
     } finally {
       builder.close()
-      if (session.isResourceUploaded) {
-        Utils.deleteFile(resource, "Failed to delete temporary uploaded resource file")
-      }
+      cleanupUploadedResourceIfNeeded()
     }
   }
 
@@ -327,12 +326,14 @@ class BatchJobSubmission(
       if (isTerminalState(state)) {
         killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
         builder.close()
+        cleanupUploadedResourceIfNeeded()
         return
       }
 
       try {
         killMessage = killBatchApplication()
         builder.close()
+        cleanupUploadedResourceIfNeeded()
       } finally {
         if (state == OperationState.INITIALIZED) {
           // if state is INITIALIZED, it means that the batch submission has not started to run, set
@@ -363,6 +364,16 @@ class BatchJobSubmission(
   override def isTimedOut: Boolean = false
 
   override protected def eventEnabled: Boolean = true
+
+  private def cleanupUploadedResourceIfNeeded(): Unit = {
+    if (session.isResourceUploaded) {
+      try {
+        Files.deleteIfExists(Paths.get(resource))
+      } catch {
+        case e: Throwable => error(s"Error deleting the uploaded resource: $resource", e)
+      }
+    }
+  }
 }
 
 object BatchJobSubmission {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 24f99a24c..969362f7d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -184,7 +184,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
         " of batchRequest is application/json")
     val tempFile = Utils.writeToTempFile(
       resourceFileInputStream,
-      KyuubiApplicationManager.tempDirForUpload,
+      KyuubiApplicationManager.uploadWorkDir,
       resourceFileMetadata.getFileName)
     batchRequest.setResource(tempFile.getPath)
     openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 7c06063a6..c77d364f3 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelpe
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.ApplicationInfo
+import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
@@ -219,6 +219,12 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
     assert(batch.getName === sparkBatchTestAppName)
     assert(batch.getCreateTime > 0)
     assert(batch.getEndTime === 0)
+
+    webTarget.path(s"api/v1/batches/${batch.getId()}").request(
+      MediaType.APPLICATION_JSON_TYPE).delete()
+    eventually(timeout(3.seconds)) {
+      assert(KyuubiApplicationManager.uploadWorkDir.toFile.listFiles().isEmpty)
+    }
   }
 
   test("get batch session list") {