You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/14 08:47:53 UTC

[kyuubi] branch master updated: [KYUUBI #4688] Fix the failure to read the operation log after executing catalog and database operation

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46bddc386 [KYUUBI #4688] Fix the failure to read the operation log after executing catalog and database operation
46bddc386 is described below

commit 46bddc3864d322f56069672c7639f2624587f68c
Author: sychen <sy...@ctrip.com>
AuthorDate: Fri Apr 14 16:47:43 2023 +0800

    [KYUUBI #4688] Fix the failure to read the operation log after executing catalog and database operation
    
    ### _Why are the changes needed?_
    
    Now `GetCurrentCatalog`/`GetCurrentDatabase`/`SetCurrentCatalog`/`SetCurrentDatabase`is executed through the statement, and the jdbc client will try to obtain the operation log corresponding to the statement.
    At present, these operations do not generate operation logs, so the engine log will be throw exception(`failed to generate operation log`).
    
    ```java
    23/04/10 20:25:23 INFO GetCurrentCatalog: Processing anonymous's query[8218e7ed-b4a4-41ad-a1cc-6f82bf3d55bb]: INITIALIZED_STATE -> RUNNING_STATE, statement:
    GetCurrentCatalog
    23/04/10 20:25:23 INFO GetCurrentCatalog: Processing anonymous's query[8218e7ed-b4a4-41ad-a1cc-6f82bf3d55bb]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.002 seconds
    23/04/10 20:25:23 ERROR SparkTBinaryFrontendService: Error fetching results:
    org.apache.kyuubi.KyuubiSQLException: OperationHandle [8218e7ed-b4a4-41ad-a1cc-6f82bf3d55bb] failed to generate operation log
            at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
            at org.apache.kyuubi.operation.OperationManager.$anonfun$getOperationLogRowSet$2(OperationManager.scala:146)
            at scala.Option.getOrElse(Option.scala:189)
    ```
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4688 from cxzl25/op_log_catalog.
    
    Closes #4688
    
    8ebc0f570 [sychen] Fix the failure to read the operation log after executing Catalog and database operation
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/flink/operation/GetCurrentCatalog.scala  |  6 ++++++
 .../engine/flink/operation/GetCurrentDatabase.scala |  6 ++++++
 .../engine/flink/operation/SetCurrentCatalog.scala  |  6 ++++++
 .../engine/flink/operation/SetCurrentDatabase.scala |  6 ++++++
 .../engine/spark/operation/GetCurrentCatalog.scala  |  5 +++++
 .../engine/spark/operation/GetCurrentDatabase.scala |  5 +++++
 .../engine/spark/operation/SetCurrentCatalog.scala  |  5 +++++
 .../engine/spark/operation/SetCurrentDatabase.scala |  5 +++++
 .../engine/trino/operation/GetCurrentCatalog.scala  |  5 +++++
 .../engine/trino/operation/GetCurrentDatabase.scala |  5 +++++
 .../engine/trino/operation/SetCurrentCatalog.scala  |  5 +++++
 .../engine/trino/operation/SetCurrentDatabase.scala |  5 +++++
 .../apache/kyuubi/operation/log/OperationLog.scala  |  2 ++
 .../kyuubi/operation/log/OperationLogSuite.scala    | 21 +++++++++++++++++++++
 14 files changed, 87 insertions(+)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
index 988072e8d..3e42e9aa6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
@@ -18,11 +18,17 @@
 package org.apache.kyuubi.engine.flink.operation
 
 import org.apache.kyuubi.engine.flink.result.ResultSetUtil
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
 import org.apache.kyuubi.session.Session
 
 class GetCurrentCatalog(session: Session) extends FlinkOperation(session) {
 
+  private val operationLog: OperationLog =
+    OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
index 8315a18d3..014ca2ea3 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
@@ -18,11 +18,17 @@
 package org.apache.kyuubi.engine.flink.operation
 
 import org.apache.kyuubi.engine.flink.result.ResultSetUtil
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
 import org.apache.kyuubi.session.Session
 
 class GetCurrentDatabase(session: Session) extends FlinkOperation(session) {
 
+  private val operationLog: OperationLog =
+    OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
index 489cc6384..60214b2cd 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
@@ -17,11 +17,17 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class SetCurrentCatalog(session: Session, catalog: String)
   extends FlinkOperation(session) {
 
+  private val operationLog: OperationLog =
+    OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
index 0d3598405..7610ab2f1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
@@ -17,11 +17,17 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class SetCurrentDatabase(session: Session, database: String)
   extends FlinkOperation(session) {
 
+  private val operationLog: OperationLog =
+    OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
index 66d707ec0..96e013284 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
@@ -21,11 +21,16 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.operation.IterableFetchIterator
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
 import org.apache.kyuubi.session.Session
 
 class GetCurrentCatalog(session: Session) extends SparkOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def resultSchema: StructType = {
     new StructType()
       .add(TABLE_CAT, "string", nullable = true, "Catalog name.")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
index bcf3ad2a5..10b325d76 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
@@ -21,11 +21,16 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.operation.IterableFetchIterator
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
 import org.apache.kyuubi.session.Session
 
 class GetCurrentDatabase(session: Session) extends SparkOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def resultSchema: StructType = {
     new StructType()
       .add(TABLE_SCHEM, "string", nullable = true, "Schema name.")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
index 4e8c0aa69..7571c3e32 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
@@ -20,10 +20,15 @@ package org.apache.kyuubi.engine.spark.operation
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class SetCurrentCatalog(session: Session, catalog: String) extends SparkOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def resultSchema: StructType = {
     new StructType()
   }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
index 0a21bc839..2112f544a 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
@@ -20,11 +20,16 @@ package org.apache.kyuubi.engine.spark.operation
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class SetCurrentDatabase(session: Session, database: String)
   extends SparkOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def resultSchema: StructType = {
     new StructType()
   }
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentCatalog.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentCatalog.scala
index 3d8c7fd6c..504a53a41 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentCatalog.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentCatalog.scala
@@ -23,11 +23,16 @@ import io.trino.client.ClientStandardTypes.VARCHAR
 import io.trino.client.ClientTypeSignature.VARCHAR_UNBOUNDED_LENGTH
 
 import org.apache.kyuubi.operation.IterableFetchIterator
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class GetCurrentCatalog(session: Session)
   extends TrinoOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val session = trinoContext.clientSession.get
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentDatabase.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentDatabase.scala
index 3bf2987b4..3ab598ef0 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentDatabase.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetCurrentDatabase.scala
@@ -23,11 +23,16 @@ import io.trino.client.ClientStandardTypes.VARCHAR
 import io.trino.client.ClientTypeSignature.VARCHAR_UNBOUNDED_LENGTH
 
 import org.apache.kyuubi.operation.IterableFetchIterator
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class GetCurrentDatabase(session: Session)
   extends TrinoOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val session = trinoContext.clientSession.get
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentCatalog.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentCatalog.scala
index 09ba4262f..16836b0a9 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentCatalog.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentCatalog.scala
@@ -19,11 +19,16 @@ package org.apache.kyuubi.engine.trino.operation
 
 import io.trino.client.ClientSession
 
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class SetCurrentCatalog(session: Session, catalog: String)
   extends TrinoOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val session = trinoContext.clientSession.get
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentDatabase.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentDatabase.scala
index f25cc9e0c..aa4697f5f 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentDatabase.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/SetCurrentDatabase.scala
@@ -19,11 +19,16 @@ package org.apache.kyuubi.engine.trino.operation
 
 import io.trino.client.ClientSession
 
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
 class SetCurrentDatabase(session: Session, database: String)
   extends TrinoOperation(session) {
 
+  private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
   override protected def runInternal(): Unit = {
     try {
       val session = trinoContext.clientSession.get
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index e6312d0fb..3791c08d2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -195,6 +195,8 @@ class OperationLog(path: Path) {
   }
 
   def close(): Unit = synchronized {
+    if (!initialized) return
+
     closeExtraReaders()
 
     trySafely {
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index fe3cbc7fc..b333b59fd 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -297,4 +297,25 @@ class OperationLogSuite extends KyuubiFunSuite {
       Utils.deleteDirectoryRecursively(extraFile.toFile)
     }
   }
+
+  test("Closing the unwritten operation log should not throw an exception") {
+    val sessionManager = new NoopSessionManager
+    sessionManager.initialize(KyuubiConf())
+    val sHandle = sessionManager.openSession(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+      "kyuubi",
+      "passwd",
+      "localhost",
+      Map.empty)
+    val session = sessionManager.getSession(sHandle)
+    OperationLog.createOperationLogRootDirectory(session)
+    val oHandle = OperationHandle()
+
+    val log = OperationLog.createOperationLog(session, oHandle)
+    val tRowSet = log.read(1)
+    assert(tRowSet == ThriftUtils.newEmptyRowSet)
+    // close the operation log without writing
+    log.close()
+    session.close()
+  }
 }