You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/21 14:06:49 UTC
[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3510] Reading an uninitialized log should return empty rowSet
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new aa658a940 [KYUUBI #3510] Reading an uninitialized log should return empty rowSet
aa658a940 is described below
commit aa658a940232d89c0e78a1802d522b7a1d456926
Author: yikf <yi...@gmail.com>
AuthorDate: Wed Sep 21 22:01:44 2022 +0800
[KYUUBI #3510] Reading an uninitialized log should return empty rowSet
### _Why are the changes needed?_
Fix https://github.com/apache/incubator-kyuubi/issues/3510
Reading an uninitialized log should return empty rowSet , So we can set EmptyRowSet for TFetchResultsResp to avoid client side NPE if FetchResults fail on server side.
For example: Configure log4j for `ERROR` level, causing the engine side no log file output, so server side throw `java.nio.file. NoSuchFileException`, at this time the client get engine log, throws the NPE.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3511 from Yikf/issue-3510.
Closes #3510
3fee6591 [yikf] fix
Authored-by: yikf <yi...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit ef68aa0fa99803c045dce5faa13c95570ddafbd3)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../apache/kyuubi/operation/log/OperationLog.scala | 10 ++++++
.../kyuubi/operation/log/OperationLogSuite.scala | 38 +++++++++++++++++++---
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 84ee117f3..996ca4bb5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -31,6 +31,7 @@ import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThriftUtils
object OperationLog extends Logging {
final private val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
@@ -87,6 +88,8 @@ class OperationLog(path: Path) {
private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ @volatile private var initialized: Boolean = false
+
private lazy val extraPaths: ListBuffer[Path] = ListBuffer()
private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
private var lastSeekReadPos = 0
@@ -108,11 +111,16 @@ class OperationLog(path: Path) {
try {
writer.write(msg)
writer.flush()
+ initOperationLogIfNecessary()
} catch {
case _: IOException => // TODO: better do nothing?
}
}
+ private[log] def initOperationLogIfNecessary(): Unit = {
+ if (!initialized) initialized = true
+ }
+
private def readLogs(
reader: BufferedReader,
lastRows: Int,
@@ -148,6 +156,7 @@ class OperationLog(path: Path) {
* @param maxRows maximum result number can reach
*/
def read(maxRows: Int): TRowSet = synchronized {
+ if (!initialized) return ThriftUtils.newEmptyRowSet
val (logs, lines) = readLogs(reader, maxRows, maxRows)
var lastRows = maxRows - lines
for (extraReader <- extraReaders if lastRows > 0 || maxRows <= 0) {
@@ -160,6 +169,7 @@ class OperationLog(path: Path) {
}
def read(from: Int, size: Int): TRowSet = synchronized {
+ if (!initialized) return ThriftUtils.newEmptyRowSet
var pos = from
if (pos < 0) {
// just fetch forward
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 577e8e55f..02acee4d0 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -29,6 +29,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.NoopSessionManager
+import org.apache.kyuubi.util.ThriftUtils
class OperationLogSuite extends KyuubiFunSuite {
@@ -146,10 +147,11 @@ class OperationLogSuite extends KyuubiFunSuite {
OperationLog.createOperationLogRootDirectory(session)
assert(logRoot.isFile)
val oHandle = OperationHandle()
- intercept[Exception] {
- val log = OperationLog.createOperationLog(session, oHandle)
- log.read(1)
- }
+
+ val log = OperationLog.createOperationLog(session, oHandle)
+ val tRowSet = log.read(1)
+ assert(tRowSet == ThriftUtils.newEmptyRowSet)
+
logRoot.delete()
OperationLog.createOperationLogRootDirectory(session)
@@ -222,6 +224,9 @@ class OperationLogSuite extends KyuubiFunSuite {
}
val log = new OperationLog(file)
+ // The operation log file is created externally and should be initialized actively.
+ log.initOperationLogIfNecessary()
+
compareResult(log.read(-1, 1), Seq("0"))
compareResult(log.read(-1, 1), Seq("1"))
compareResult(log.read(0, 1), Seq("0"))
@@ -231,4 +236,29 @@ class OperationLogSuite extends KyuubiFunSuite {
Utils.deleteDirectoryRecursively(file.toFile)
}
}
+
+ test("[KYUUBI #3511] Reading an uninitialized log should return empty rowSet") {
+ val sessionManager = new NoopSessionManager
+ sessionManager.initialize(KyuubiConf())
+ val sHandle = sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ Map.empty)
+ val session = sessionManager.getSession(sHandle)
+ val oHandle = OperationHandle()
+
+ val log = OperationLog.createOperationLog(session, oHandle)
+ // It has not been initialized, and returns empty `TRowSet` directly.
+ val tRowSet = log.read(1)
+ assert(tRowSet == ThriftUtils.newEmptyRowSet)
+
+ OperationLog.createOperationLogRootDirectory(session)
+ val log1 = OperationLog.createOperationLog(session, oHandle)
+ // write means initialized operationLog, we can read log directly later
+ log1.write(msg1)
+ val msg = log1.read(1).getColumns.get(0).getStringVal.getValues.asScala.head
+ assert(msg == msg1)
+ }
}