You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/11 01:41:33 UTC
[incubator-kyuubi] branch branch-1.5 updated: [KYUUBI #2296] Fix operation log file handler leak
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 809ea2a61 [KYUUBI #2296] Fix operation log file handler leak
809ea2a61 is described below
commit 809ea2a615ce6bbf8220c8475cab480395f36452
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Sat Apr 9 15:24:59 2022 +0800
[KYUUBI #2296] Fix operation log file handler leak
### _Why are the changes needed?_
If a session is empty which means that it does not contain any operation, then the OperationLog will never be closed.
This bug will happen if we enable `LaunchEngine`
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2296 from ulysses-you/fix-file-handler-leak.
Closes #2296
22d07e1a [ulysses-you] fix
bb83b1a9 [ulysses-you] fix
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
(cherry picked from commit e5834ae77d30289d1086d5c90cf42a0a08116d4c)
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../apache/kyuubi/operation/log/OperationLog.scala | 22 +++++++++++++++++-----
.../kyuubi/operation/log/OperationLogSuite.scala | 11 ++++++++---
.../apache/kyuubi/operation/ExecuteStatement.scala | 4 ----
.../apache/kyuubi/operation/KyuubiOperation.scala | 13 +++++++------
4 files changed, 32 insertions(+), 18 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 816f78f92..63cc0ded8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -83,10 +83,10 @@ object OperationLog extends Logging {
class OperationLog(path: Path) {
- private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
- private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
+ private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
- private val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
+ private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
def addExtraLog(path: Path): Unit = synchronized {
try {
@@ -151,11 +151,23 @@ class OperationLog(path: Path) {
}
def close(): Unit = synchronized {
- try {
- closeExtraReaders()
+ closeExtraReaders()
+
+ trySafely {
reader.close()
+ }
+ trySafely {
writer.close()
+ }
+
+ trySafely {
Files.delete(path)
+ }
+ }
+
+ private def trySafely(f: => Unit): Unit = {
+ try {
+ f
} catch {
case e: IOException =>
// Printing log here may cause a deadlock. The lock order of OperationLog.write
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 1da77098e..a2a5d167f 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -57,7 +57,8 @@ class OperationLogSuite extends KyuubiFunSuite {
val operationLog = OperationLog.createOperationLog(session, oHandle)
val logFile =
Paths.get(operationLogRoot, sHandle.identifier.toString, oHandle.identifier.toString)
- assert(Files.exists(logFile))
+ // lazy create
+ assert(!Files.exists(logFile))
OperationLog.setCurrentOperationLog(operationLog)
assert(OperationLog.getCurrentOperationLog === operationLog)
@@ -66,6 +67,8 @@ class OperationLogSuite extends KyuubiFunSuite {
assert(OperationLog.getCurrentOperationLog === null)
operationLog.write(msg1 + "\n")
+ assert(Files.exists(logFile))
+
val tRowSet1 = operationLog.read(1)
assert(tRowSet1.getColumns.get(0).getStringVal.getValues.get(0) === msg1)
val tRowSet2 = operationLog.read(1)
@@ -145,8 +148,10 @@ class OperationLogSuite extends KyuubiFunSuite {
val oHandle = OperationHandle(
OperationType.EXECUTE_STATEMENT,
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
- val log = OperationLog.createOperationLog(session, oHandle)
- assert(log === null)
+ intercept[Exception] {
+ val log = OperationLog.createOperationLog(session, oHandle)
+ log.read(1)
+ }
logRoot.delete()
OperationLog.createOperationLogRootDirectory(session)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 74b865610..48854484b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -176,8 +176,4 @@ class ExecuteStatement(
super.setState(newState)
EventLogging.onEvent(KyuubiOperationEvent(this))
}
-
- override def close(): Unit = {
- super.close()
- }
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index f5f0538d5..88b3a9ab9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -119,13 +119,14 @@ abstract class KyuubiOperation(opType: OperationType, session: Session)
if (!isClosedOrCanceled) {
setState(OperationState.CLOSED)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opTypeName)))
+ try {
+ // For launch engine operation, we use OperationLog to pass engine submit log but
+ // at that time we do not have remoteOpHandle
+ getOperationLog.foreach(_.close())
+ } catch {
+ case e: IOException => error(e.getMessage, e)
+ }
if (_remoteOpHandle != null) {
- try {
- getOperationLog.foreach(_.close())
- } catch {
- case e: IOException => error(e.getMessage, e)
- }
-
try {
client.closeOperation(_remoteOpHandle)
} catch {