You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/11/03 00:18:12 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3745] Closing existing seekable reader when adding extra log
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new fd2d0ccd8 [KYUUBI #3745] Closing existing seekable reader when adding extra log
fd2d0ccd8 is described below
commit fd2d0ccd86ce565fd2aa3e1ac1ce3b0e9bf3d241
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Thu Nov 3 08:18:04 2022 +0800
[KYUUBI #3745] Closing existing seekable reader when adding extra log
### _Why are the changes needed?_
Closing existing seekable reader when adding extra log
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3745 from turboFei/fix_op_log_bug.
Closes #3745
5faf9a99 [Fei Wang] save
3bf78f3e [Fei Wang] recreate seek reader
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../apache/kyuubi/operation/log/OperationLog.scala | 2 ++
.../kyuubi/operation/log/OperationLogSuite.scala | 36 ++++++++++++++++++++++
2 files changed, 38 insertions(+)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 996ca4bb5..84c4ed55c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -99,6 +99,8 @@ class OperationLog(path: Path) {
try {
extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
extraPaths += path
+ Option(seekableReader).foreach(_.close)
+ seekableReader = null
} catch {
case _: IOException =>
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 02acee4d0..758eeeeaf 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -261,4 +261,40 @@ class OperationLogSuite extends KyuubiFunSuite {
val msg = log1.read(1).getColumns.get(0).getStringVal.getValues.asScala.head
assert(msg == msg1)
}
+
+ test("closing existing seekable reader when adding extra log") {
+ val file = Utils.createTempDir().resolve("f")
+ val writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8)
+ val extraFile = Utils.createTempDir().resolve("e")
+ val extraWriter = Files.newBufferedWriter(extraFile, StandardCharsets.UTF_8)
+
+ try {
+ writer.write(s"log")
+ writer.flush()
+ writer.close()
+
+ extraWriter.write("extra_log")
+ extraWriter.flush()
+ extraWriter.close()
+
+ def compareResult(rows: TRowSet, expected: Seq[String]): Unit = {
+ val res = rows.getColumns.get(0).getStringVal.getValues.asScala
+ assert(res.size == expected.size)
+ res.zip(expected).foreach { case (l, r) =>
+ assert(l == r)
+ }
+ }
+
+ val log = new OperationLog(file)
+ // The operation log file is created externally and should be initialized actively.
+ log.initOperationLogIfNecessary()
+
+ compareResult(log.read(0, 1), Seq("log"))
+ log.addExtraLog(extraFile)
+ compareResult(log.read(1, 1), Seq("extra_log"))
+ } finally {
+ Utils.deleteDirectoryRecursively(file.toFile)
+ Utils.deleteDirectoryRecursively(extraFile.toFile)
+ }
+ }
}