You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/11/03 00:18:21 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3745] Closing existing seekable reader when adding extra log

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 7cf6428b2 [KYUUBI #3745] Closing existing seekable reader when adding extra log
7cf6428b2 is described below

commit 7cf6428b2ba8417f048c6e2e95aec6376f80a482
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Thu Nov 3 08:18:04 2022 +0800

    [KYUUBI #3745] Closing existing seekable reader when adding extra log
    
    ### _Why are the changes needed?_
    
    Closing existing seekable reader when adding extra log
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3745 from turboFei/fix_op_log_bug.
    
    Closes #3745
    
    5faf9a99 [Fei Wang] save
    3bf78f3e [Fei Wang] recreate seek reader
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
    (cherry picked from commit fd2d0ccd86ce565fd2aa3e1ac1ce3b0e9bf3d241)
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../apache/kyuubi/operation/log/OperationLog.scala |  2 ++
 .../kyuubi/operation/log/OperationLogSuite.scala   | 36 ++++++++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 996ca4bb5..84c4ed55c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -99,6 +99,8 @@ class OperationLog(path: Path) {
     try {
       extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
       extraPaths += path
+      Option(seekableReader).foreach(_.close)
+      seekableReader = null
     } catch {
       case _: IOException =>
     }
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 02acee4d0..758eeeeaf 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -261,4 +261,40 @@ class OperationLogSuite extends KyuubiFunSuite {
     val msg = log1.read(1).getColumns.get(0).getStringVal.getValues.asScala.head
     assert(msg == msg1)
   }
+
+  test("closing existing seekable reader when adding extra log") {
+    val file = Utils.createTempDir().resolve("f")
+    val writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8)
+    val extraFile = Utils.createTempDir().resolve("e")
+    val extraWriter = Files.newBufferedWriter(extraFile, StandardCharsets.UTF_8)
+
+    try {
+      writer.write(s"log")
+      writer.flush()
+      writer.close()
+
+      extraWriter.write("extra_log")
+      extraWriter.flush()
+      extraWriter.close()
+
+      def compareResult(rows: TRowSet, expected: Seq[String]): Unit = {
+        val res = rows.getColumns.get(0).getStringVal.getValues.asScala
+        assert(res.size == expected.size)
+        res.zip(expected).foreach { case (l, r) =>
+          assert(l == r)
+        }
+      }
+
+      val log = new OperationLog(file)
+      // The operation log file is created externally and should be initialized actively.
+      log.initOperationLogIfNecessary()
+
+      compareResult(log.read(0, 1), Seq("log"))
+      log.addExtraLog(extraFile)
+      compareResult(log.read(1, 1), Seq("extra_log"))
+    } finally {
+      Utils.deleteDirectoryRecursively(file.toFile)
+      Utils.deleteDirectoryRecursively(extraFile.toFile)
+    }
+  }
 }