You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/11 01:41:33 UTC

[incubator-kyuubi] branch branch-1.5 updated: [KYUUBI #2296] Fix operation log file handler leak

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 809ea2a61 [KYUUBI #2296] Fix operation log file handler leak
809ea2a61 is described below

commit 809ea2a615ce6bbf8220c8475cab480395f36452
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Sat Apr 9 15:24:59 2022 +0800

    [KYUUBI #2296] Fix operation log file handler leak
    
    ### _Why are the changes needed?_
    
    If a session is empty which means that it does not contain any operation, then the OperationLog will never be closed.
    
    This bug will happen if we enable `LaunchEngine`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2296 from ulysses-you/fix-file-handler-leak.
    
    Closes #2296
    
    22d07e1a [ulysses-you] fix
    bb83b1a9 [ulysses-you] fix
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
    (cherry picked from commit e5834ae77d30289d1086d5c90cf42a0a08116d4c)
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../apache/kyuubi/operation/log/OperationLog.scala | 22 +++++++++++++++++-----
 .../kyuubi/operation/log/OperationLogSuite.scala   | 11 ++++++++---
 .../apache/kyuubi/operation/ExecuteStatement.scala |  4 ----
 .../apache/kyuubi/operation/KyuubiOperation.scala  | 13 +++++++------
 4 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 816f78f92..63cc0ded8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -83,10 +83,10 @@ object OperationLog extends Logging {
 
 class OperationLog(path: Path) {
 
-  private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
-  private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
+  private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
+  private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
 
-  private val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
+  private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
 
   def addExtraLog(path: Path): Unit = synchronized {
     try {
@@ -151,11 +151,23 @@ class OperationLog(path: Path) {
   }
 
   def close(): Unit = synchronized {
-    try {
-      closeExtraReaders()
+    closeExtraReaders()
+
+    trySafely {
       reader.close()
+    }
+    trySafely {
       writer.close()
+    }
+
+    trySafely {
       Files.delete(path)
+    }
+  }
+
+  private def trySafely(f: => Unit): Unit = {
+    try {
+      f
     } catch {
       case e: IOException =>
         // Printing log here may cause a deadlock. The lock order of OperationLog.write
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 1da77098e..a2a5d167f 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -57,7 +57,8 @@ class OperationLogSuite extends KyuubiFunSuite {
     val operationLog = OperationLog.createOperationLog(session, oHandle)
     val logFile =
       Paths.get(operationLogRoot, sHandle.identifier.toString, oHandle.identifier.toString)
-    assert(Files.exists(logFile))
+    // lazy create
+    assert(!Files.exists(logFile))
 
     OperationLog.setCurrentOperationLog(operationLog)
     assert(OperationLog.getCurrentOperationLog === operationLog)
@@ -66,6 +67,8 @@ class OperationLogSuite extends KyuubiFunSuite {
     assert(OperationLog.getCurrentOperationLog === null)
 
     operationLog.write(msg1 + "\n")
+    assert(Files.exists(logFile))
+
     val tRowSet1 = operationLog.read(1)
     assert(tRowSet1.getColumns.get(0).getStringVal.getValues.get(0) === msg1)
     val tRowSet2 = operationLog.read(1)
@@ -145,8 +148,10 @@ class OperationLogSuite extends KyuubiFunSuite {
     val oHandle = OperationHandle(
       OperationType.EXECUTE_STATEMENT,
       TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
-    val log = OperationLog.createOperationLog(session, oHandle)
-    assert(log === null)
+    intercept[Exception] {
+      val log = OperationLog.createOperationLog(session, oHandle)
+      log.read(1)
+    }
     logRoot.delete()
 
     OperationLog.createOperationLogRootDirectory(session)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 74b865610..48854484b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -176,8 +176,4 @@ class ExecuteStatement(
     super.setState(newState)
     EventLogging.onEvent(KyuubiOperationEvent(this))
   }
-
-  override def close(): Unit = {
-    super.close()
-  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index f5f0538d5..88b3a9ab9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -119,13 +119,14 @@ abstract class KyuubiOperation(opType: OperationType, session: Session)
     if (!isClosedOrCanceled) {
       setState(OperationState.CLOSED)
       MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opTypeName)))
+      try {
+        // For launch engine operation, we use OperationLog to pass engine submit log but
+        // at that time we do not have remoteOpHandle
+        getOperationLog.foreach(_.close())
+      } catch {
+        case e: IOException => error(e.getMessage, e)
+      }
       if (_remoteOpHandle != null) {
-        try {
-          getOperationLog.foreach(_.close())
-        } catch {
-          case e: IOException => error(e.getMessage, e)
-        }
-
         try {
           client.closeOperation(_remoteOpHandle)
         } catch {