You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/21 14:06:49 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3510] Reading an uninitialized log should return empty rowSet

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new aa658a940 [KYUUBI #3510] Reading an uninitialized log should return empty rowSet
aa658a940 is described below

commit aa658a940232d89c0e78a1802d522b7a1d456926
Author: yikf <yi...@gmail.com>
AuthorDate: Wed Sep 21 22:01:44 2022 +0800

    [KYUUBI #3510] Reading an uninitialized log should return empty rowSet
    
    ### _Why are the changes needed?_
    
    Fix https://github.com/apache/incubator-kyuubi/issues/3510
    
    Reading an uninitialized log should return empty rowSet , So we can set EmptyRowSet for TFetchResultsResp to avoid client side NPE if FetchResults fail on server side.
    
    For example: Configure log4j for `ERROR` level, causing the engine side no log file output, so server side throw `java.nio.file. NoSuchFileException`, at this time the client get engine log, throws the NPE.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3511 from Yikf/issue-3510.
    
    Closes #3510
    
    3fee6591 [yikf] fix
    
    Authored-by: yikf <yi...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit ef68aa0fa99803c045dce5faa13c95570ddafbd3)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../apache/kyuubi/operation/log/OperationLog.scala | 10 ++++++
 .../kyuubi/operation/log/OperationLogSuite.scala   | 38 +++++++++++++++++++---
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 84ee117f3..996ca4bb5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -31,6 +31,7 @@ import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThriftUtils
 
 object OperationLog extends Logging {
   final private val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
@@ -87,6 +88,8 @@ class OperationLog(path: Path) {
   private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
   private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
 
+  @volatile private var initialized: Boolean = false
+
   private lazy val extraPaths: ListBuffer[Path] = ListBuffer()
   private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
   private var lastSeekReadPos = 0
@@ -108,11 +111,16 @@ class OperationLog(path: Path) {
     try {
       writer.write(msg)
       writer.flush()
+      initOperationLogIfNecessary()
     } catch {
       case _: IOException => // TODO: better do nothing?
     }
   }
 
+  private[log] def initOperationLogIfNecessary(): Unit = {
+    if (!initialized) initialized = true
+  }
+
   private def readLogs(
       reader: BufferedReader,
       lastRows: Int,
@@ -148,6 +156,7 @@ class OperationLog(path: Path) {
    * @param maxRows maximum result number can reach
    */
   def read(maxRows: Int): TRowSet = synchronized {
+    if (!initialized) return ThriftUtils.newEmptyRowSet
     val (logs, lines) = readLogs(reader, maxRows, maxRows)
     var lastRows = maxRows - lines
     for (extraReader <- extraReaders if lastRows > 0 || maxRows <= 0) {
@@ -160,6 +169,7 @@ class OperationLog(path: Path) {
   }
 
   def read(from: Int, size: Int): TRowSet = synchronized {
+    if (!initialized) return ThriftUtils.newEmptyRowSet
     var pos = from
     if (pos < 0) {
       // just fetch forward
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 577e8e55f..02acee4d0 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -29,6 +29,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.session.NoopSessionManager
+import org.apache.kyuubi.util.ThriftUtils
 
 class OperationLogSuite extends KyuubiFunSuite {
 
@@ -146,10 +147,11 @@ class OperationLogSuite extends KyuubiFunSuite {
     OperationLog.createOperationLogRootDirectory(session)
     assert(logRoot.isFile)
     val oHandle = OperationHandle()
-    intercept[Exception] {
-      val log = OperationLog.createOperationLog(session, oHandle)
-      log.read(1)
-    }
+
+    val log = OperationLog.createOperationLog(session, oHandle)
+    val tRowSet = log.read(1)
+    assert(tRowSet == ThriftUtils.newEmptyRowSet)
+
     logRoot.delete()
 
     OperationLog.createOperationLogRootDirectory(session)
@@ -222,6 +224,9 @@ class OperationLogSuite extends KyuubiFunSuite {
       }
 
       val log = new OperationLog(file)
+      // The operation log file is created externally and should be initialized actively.
+      log.initOperationLogIfNecessary()
+
       compareResult(log.read(-1, 1), Seq("0"))
       compareResult(log.read(-1, 1), Seq("1"))
       compareResult(log.read(0, 1), Seq("0"))
@@ -231,4 +236,29 @@ class OperationLogSuite extends KyuubiFunSuite {
       Utils.deleteDirectoryRecursively(file.toFile)
     }
   }
+
+  test("[KYUUBI #3511] Reading an uninitialized log should return empty rowSet") {
+    val sessionManager = new NoopSessionManager
+    sessionManager.initialize(KyuubiConf())
+    val sHandle = sessionManager.openSession(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+      "kyuubi",
+      "passwd",
+      "localhost",
+      Map.empty)
+    val session = sessionManager.getSession(sHandle)
+    val oHandle = OperationHandle()
+
+    val log = OperationLog.createOperationLog(session, oHandle)
+    // It has not been initialized, and returns empty `TRowSet` directly.
+    val tRowSet = log.read(1)
+    assert(tRowSet == ThriftUtils.newEmptyRowSet)
+
+    OperationLog.createOperationLogRootDirectory(session)
+    val log1 = OperationLog.createOperationLog(session, oHandle)
+    // write means initialized operationLog, we can read log directly later
+    log1.write(msg1)
+    val msg = log1.read(1).getColumns.get(0).getStringVal.getValues.asScala.head
+    assert(msg == msg1)
+  }
 }