You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ho...@apache.org on 2022/04/21 01:09:44 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2253] [Improvement] Trino Engine - Events support

This is an automated email from the ASF dual-hosted git repository.

hongdd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ec707b74 [KYUUBI #2253] [Improvement] Trino Engine - Events support
4ec707b74 is described below

commit 4ec707b7427e90c9a50e70b78d7b5ccf24884d59
Author: Min Zhao <zh...@163.com>
AuthorDate: Thu Apr 21 09:09:24 2022 +0800

    [KYUUBI #2253] [Improvement] Trino Engine - Events support
    
    ### _Why are the changes needed?_
    
    add event support for trino engine.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2401 from zhaomin1423/trino_event1.
    
    Closes #2253
    
    dc3c2c2b [Min Zhao] fix conflict
    caec1cfd [Min Zhao] fix conflict
    7ca01691 [Min Zhao] add trino event
    
    Authored-by: Min Zhao <zh...@163.com>
    Signed-off-by: hongdongdong <ho...@cmss.chinamobile.com>
---
 externals/kyuubi-trino-engine/pom.xml              |   6 +
 .../kyuubi/engine/trino/TrinoSqlEngine.scala       |  46 ++++++-
 .../engine/trino/event/TrinoEngineEvent.scala      |  74 +++++++++++
 .../engine/trino/event/TrinoOperationEvent.scala   |  79 ++++++++++++
 .../engine/trino/event/TrinoSessionEvent.scala     |  55 +++++++++
 .../handler/TrinoJsonLoggingEventHandler.scala     |  29 +++++
 .../engine/trino/operation/ExecuteStatement.scala  |   4 +
 .../engine/trino/session/TrinoSessionImpl.scala    |  17 +++
 .../engine/trino/event/TrinoSqlEventSuite.scala    | 137 +++++++++++++++++++++
 9 files changed, 445 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-trino-engine/pom.xml b/externals/kyuubi-trino-engine/pom.xml
index b5ad5267f..9807afc82 100644
--- a/externals/kyuubi-trino-engine/pom.xml
+++ b/externals/kyuubi-trino-engine/pom.xml
@@ -40,6 +40,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-events_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
index 1a2534fb7..67eafcfe6 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
@@ -17,18 +17,25 @@
 
 package org.apache.kyuubi.engine.trino
 
+import java.net.InetAddress
 import java.util.concurrent.CountDownLatch
 
+import scala.util.control.NonFatal
+
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.Utils.TRINO_ENGINE_SHUTDOWN_PRIORITY
 import org.apache.kyuubi.Utils.addShutdownHook
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_EVENT_LOGGERS}
 import org.apache.kyuubi.engine.trino.TrinoSqlEngine.countDownLatch
 import org.apache.kyuubi.engine.trino.TrinoSqlEngine.currentEngine
+import org.apache.kyuubi.engine.trino.event.TrinoEngineEvent
+import org.apache.kyuubi.engine.trino.event.handler.TrinoJsonLoggingEventHandler
+import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
 import org.apache.kyuubi.ha.client.RetryPolicies
 import org.apache.kyuubi.service.Serverable
-import org.apache.kyuubi.util.SignalRegister
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
 
 case class TrinoSqlEngine()
   extends Serverable("TrinoSQLEngine") {
@@ -56,15 +63,47 @@ object TrinoSqlEngine extends Logging {
   private val countDownLatch = new CountDownLatch(1)
 
   val kyuubiConf: KyuubiConf = KyuubiConf()
+    .set(ENGINE_EVENT_LOGGERS.key, "JSON")
 
   var currentEngine: Option[TrinoSqlEngine] = None
 
   def startEngine(): Unit = {
+    try {
+      initLoggerEventHandler(kyuubiConf)
+    } catch {
+      case NonFatal(e) =>
+        warn(s"Failed to initialize Logger EventHandler: ${e.getMessage}", e)
+    }
     currentEngine = Some(new TrinoSqlEngine())
     currentEngine.foreach { engine =>
       engine.initialize(kyuubiConf)
+      EventBus.post(TrinoEngineEvent(engine))
       engine.start()
-      addShutdownHook(() => engine.stop(), TRINO_ENGINE_SHUTDOWN_PRIORITY + 1)
+      EventBus.post(TrinoEngineEvent(engine))
+      addShutdownHook(
+        () => {
+          engine.stop()
+          val event = TrinoEngineEvent(engine)
+            .copy(endTime = System.currentTimeMillis())
+          EventBus.post(event)
+        },
+        TRINO_ENGINE_SHUTDOWN_PRIORITY + 1)
+    }
+  }
+
+  private def initLoggerEventHandler(conf: KyuubiConf): Unit = {
+    val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+    conf.get(ENGINE_EVENT_LOGGERS).map(EventLoggerType.withName).foreach {
+      case EventLoggerType.JSON =>
+        val hostName = InetAddress.getLocalHost.getCanonicalHostName
+        val handler = TrinoJsonLoggingEventHandler(
+          s"Trino-$hostName",
+          ENGINE_EVENT_JSON_LOG_PATH,
+          hadoopConf,
+          conf)
+        EventBus.register[KyuubiEvent](handler)
+      case logger =>
+        throw new IllegalArgumentException(s"Unrecognized event logger: $logger")
     }
   }
 
@@ -84,6 +123,9 @@ object TrinoSqlEngine extends Logging {
         currentEngine.foreach { engine =>
           error(t)
           engine.stop()
+          val event = TrinoEngineEvent(engine)
+            .copy(endTime = System.currentTimeMillis(), diagnostic = t.getMessage)
+          EventBus.post(event)
         }
       case t: Throwable => error("Create Trino Engine Failed", t)
     }
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoEngineEvent.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoEngineEvent.scala
new file mode 100644
index 000000000..7e5936360
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoEngineEvent.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.trino.TrinoSqlEngine
+import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.service.ServiceState
+import org.apache.kyuubi.service.ServiceState.ServiceState
+
+case class TrinoEngineEvent(
+    connectionUrl: String,
+    startTime: Long,
+    endTime: Long,
+    state: ServiceState,
+    diagnostic: String,
+    settings: Map[String, String]) extends KyuubiEvent {
+
+  override def partitions: Seq[(String, String)] = {
+    // before engine is started, the start time is 0L, the partition use current day.
+    if (startTime == 0) {
+      ("day", Utils.getDateFromTimestamp(System.currentTimeMillis())) :: Nil
+    } else {
+      ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+    }
+  }
+
+  override def toString: String = {
+    s"""
+       |TrinoEngineEvent: {
+       |connectionUrl: $connectionUrl,
+       |startTime: $startTime,
+       |endTime: $endTime,
+       |state: $state,
+       |diagnostic: $diagnostic,
+       |settings: ${settings.mkString("<", ",", ">")}
+       |}
+       |""".stripMargin
+  }
+}
+
+object TrinoEngineEvent {
+
+  def apply(engine: TrinoSqlEngine): TrinoEngineEvent = {
+    val connectionUrl =
+      if (engine.getServiceState.equals(ServiceState.LATENT)) {
+        null
+      } else {
+        engine.frontendServices.head.connectionUrl
+      }
+
+    new TrinoEngineEvent(
+      connectionUrl = connectionUrl,
+      startTime = engine.getStartTime,
+      endTime = -1L,
+      state = engine.getServiceState,
+      diagnostic = "",
+      settings = engine.getConf.getAll)
+  }
+}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoOperationEvent.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoOperationEvent.scala
new file mode 100644
index 000000000..ebcd2b3ad
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoOperationEvent.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.trino.operation.TrinoOperation
+import org.apache.kyuubi.events.KyuubiEvent
+
+/**
+ * A [[TrinoOperationEvent()]] used to tracker the lifecycle of an operation
+ * at Trino SQL Engine side.
+ * <ul>
+ *   <li>Operation Basis</li>
+ *   <li>Operation Live Status</li>
+ *   <li>Parent Session Id</li>
+ * </ul>
+ *
+ * @param statementId the unique identifier of a single operation
+ * @param statement the sql that you execute
+ * @param shouldRunAsync the flag indicating whether the query runs synchronously or not
+ * @param state the current operation state
+ * @param eventTime the time when the event created & logged
+ * @param createTime the time for changing to the current operation state
+ * @param startTime the time the query start to time of this operation
+ * @param completeTime time time the query ends
+ * @param exception: caught exception if have
+ * @param sessionId the identifier of the parent session
+ * @param sessionUser the authenticated client user
+ */
+case class TrinoOperationEvent(
+    statementId: String,
+    statement: String,
+    shouldRunAsync: Boolean,
+    state: String,
+    eventTime: Long,
+    createTime: Long,
+    startTime: Long,
+    completeTime: Long,
+    exception: Option[Throwable],
+    sessionId: String,
+    sessionUser: String) extends KyuubiEvent {
+
+  override def partitions: Seq[(String, String)] =
+    ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
+}
+
+object TrinoOperationEvent {
+
+  def apply(operation: TrinoOperation): TrinoOperationEvent = {
+    val session = operation.getSession
+    val status = operation.getStatus
+    TrinoOperationEvent(
+      operation.statementId,
+      operation.redactedStatement,
+      operation.shouldRunAsync,
+      status.state.name(),
+      status.lastModified,
+      status.create,
+      status.start,
+      status.completed,
+      status.exception,
+      session.handle.identifier.toString,
+      session.user)
+  }
+}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoSessionEvent.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoSessionEvent.scala
new file mode 100644
index 000000000..66e6e9767
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoSessionEvent.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl
+import org.apache.kyuubi.events.KyuubiEvent
+
+case class TrinoSessionEvent(
+    sessionId: String,
+    username: String,
+    ip: String,
+    connectionUrl: String,
+    catalog: String,
+    startTime: Long,
+    var endTime: Long = -1L,
+    var totalOperations: Int = 0) extends KyuubiEvent {
+
+  override def partitions: Seq[(String, String)] =
+    ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+
+}
+
+object TrinoSessionEvent {
+
+  def apply(session: TrinoSessionImpl): TrinoSessionEvent = {
+    val sessionConf = session.sessionManager.getConf
+    val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse(
+      throw KyuubiSQLException("Trino server url can not be null!"))
+    val catalog = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
+      throw KyuubiSQLException("Trino default catalog can not be null!"))
+    new TrinoSessionEvent(
+      session.handle.identifier.toString,
+      session.user,
+      session.ipAddress,
+      connectionUrl = connectionUrl,
+      catalog = catalog,
+      session.createTime)
+  }
+}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/handler/TrinoJsonLoggingEventHandler.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/handler/TrinoJsonLoggingEventHandler.scala
new file mode 100644
index 000000000..1d00e7bce
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/handler/TrinoJsonLoggingEventHandler.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event.handler
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.kyuubi.config.{ConfigEntry, KyuubiConf}
+import org.apache.kyuubi.events.handler.JsonLoggingEventHandler
+
+case class TrinoJsonLoggingEventHandler(
+    logName: String,
+    logPath: ConfigEntry[String],
+    hadoopConf: Configuration,
+    kyuubiConf: KyuubiConf)
+  extends JsonLoggingEventHandler(logName, logPath, hadoopConf, kyuubiConf)
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 1d7e28536..2f7470713 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorServic
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.engine.trino.TrinoStatement
+import org.apache.kyuubi.engine.trino.event.TrinoOperationEvent
+import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.ArrayFetchIterator
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.OperationState
@@ -112,4 +114,6 @@ class ExecuteStatement(
       statementTimeoutCleaner = Some(timeoutExecutor)
     }
   }
+
+  EventBus.post(TrinoOperationEvent(this))
 }
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index e53ecec7b..c774a966f 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -34,6 +34,9 @@ import org.apache.kyuubi.Utils.currentUser
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.engine.trino.TrinoConf
 import org.apache.kyuubi.engine.trino.TrinoContext
+import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
+import org.apache.kyuubi.events.EventBus
+import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.session.AbstractSession
 import org.apache.kyuubi.session.SessionManager
 
@@ -49,6 +52,8 @@ class TrinoSessionImpl(
   var trinoContext: TrinoContext = _
   private var clientSession: ClientSession = _
 
+  private val sessionEvent = TrinoSessionEvent(this)
+
   override def open(): Unit = {
     normalizedConf.foreach {
       case ("use:database", database) => clientSession = createClientSession(database)
@@ -63,6 +68,7 @@ class TrinoSessionImpl(
     trinoContext = TrinoContext(httpClient, clientSession)
 
     super.open()
+    EventBus.post(sessionEvent)
   }
 
   private def createClientSession(schema: String = null): ClientSession = {
@@ -97,4 +103,15 @@ class TrinoSessionImpl(
       new Duration(clientRequestTimeout, TimeUnit.MILLISECONDS),
       true)
   }
+
+  override protected def runOperation(operation: Operation): OperationHandle = {
+    sessionEvent.totalOperations += 1
+    super.runOperation(operation)
+  }
+
+  override def close(): Unit = {
+    sessionEvent.endTime = System.currentTimeMillis()
+    EventBus.post(sessionEvent)
+    super.close()
+  }
 }
diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/event/TrinoSqlEventSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/event/TrinoSqlEventSuite.scala
new file mode 100644
index 000000000..6071e9a84
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/event/TrinoSqlEventSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net.InetAddress
+import java.nio.file.Paths
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_SHARE_LEVEL, ENGINE_TRINO_CONNECTION_CATALOG}
+import org.apache.kyuubi.engine.trino.WithTrinoEngine
+import org.apache.kyuubi.events.JsonProtocol
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.service.ServiceState
+
+class TrinoSqlEventSuite extends WithTrinoEngine with HiveJDBCTestHelper {
+
+  private val logRoot = kyuubiConf.get(ENGINE_EVENT_JSON_LOG_PATH)
+  private val currentDate = Utils.getDateFromTimestamp(System.currentTimeMillis())
+  private val hostName = InetAddress.getLocalHost.getCanonicalHostName
+
+  override def withKyuubiConf: Map[String, String] = Map(
+    ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory",
+    ENGINE_SHARE_LEVEL.key -> "SERVER")
+
+  override protected val schema = "default"
+
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    val fileSystem: FileSystem = FileSystem.get(new Configuration())
+    val logPath = new Path(logRoot)
+    if (fileSystem.exists(logPath)) {
+      fileSystem.delete(logPath, true)
+    }
+    fileSystem.close()
+  }
+
+  test("test engine event logging") {
+    val engineEventPath = Paths.get(
+      logRoot,
+      "trino_engine",
+      s"day=$currentDate",
+      s"Trino-$hostName.json")
+    val fileSystem: FileSystem = FileSystem.get(new Configuration())
+    val fs: FSDataInputStream = fileSystem.open(new Path(engineEventPath.toString))
+    val engineEventReader = new BufferedReader(new InputStreamReader(fs))
+    val initializedEvent =
+      JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoEngineEvent])
+    assert(initializedEvent.asInstanceOf[TrinoEngineEvent].state.equals(ServiceState.INITIALIZED))
+
+    val startedEvent =
+      JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoEngineEvent])
+    assert(startedEvent.asInstanceOf[TrinoEngineEvent].state.equals(ServiceState.STARTED))
+  }
+
+  test("test session event logging") {
+    withJdbcStatement() { statement =>
+      val catalogs = statement.getConnection.getMetaData.getCatalogs
+      assert(catalogs.next())
+      val sessionEventPath = Paths.get(
+        logRoot,
+        "trino_session",
+        s"day=$currentDate",
+        s"Trino-$hostName.json")
+      val fileSystem: FileSystem = FileSystem.get(new Configuration())
+      val fs: FSDataInputStream = fileSystem.open(new Path(sessionEventPath.toString))
+      val engineEventReader = new BufferedReader(new InputStreamReader(fs))
+      val readEvent =
+        JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoSessionEvent])
+      assert(readEvent.isInstanceOf[TrinoSessionEvent])
+    }
+  }
+
+  test("test operation event logging") {
+    withJdbcStatement() { statement =>
+      val createSchemaStatement = "CREATE SCHEMA IF NOT EXISTS memory.test_schema"
+      statement.execute(createSchemaStatement)
+      val createTableStatement = "CREATE TABLE IF NOT EXISTS " +
+        "memory.test_schema.trino_engine_test(id int)"
+      statement.execute(createTableStatement)
+      val insertStatement = "INSERT INTO memory.test_schema.trino_engine_test SELECT 1"
+      statement.execute(insertStatement)
+
+      val operationEventPath = Paths.get(
+        logRoot,
+        "trino_operation",
+        s"day=$currentDate",
+        s"Trino-$hostName.json")
+
+      val fileSystem: FileSystem = FileSystem.get(new Configuration())
+      val fs: FSDataInputStream = fileSystem.open(new Path(operationEventPath.toString))
+      val engineEventReader = new BufferedReader(new InputStreamReader(fs))
+
+      val createSchemaEvent =
+        JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoOperationEvent])
+      assert(createSchemaEvent.isInstanceOf[TrinoOperationEvent])
+      assert(StringUtils.equals(
+        createSchemaEvent.asInstanceOf[TrinoOperationEvent].statement,
+        createSchemaStatement))
+
+      val createTableEvent =
+        JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoOperationEvent])
+      assert(createTableEvent.isInstanceOf[TrinoOperationEvent])
+      assert(StringUtils.equals(
+        createTableEvent.asInstanceOf[TrinoOperationEvent].statement,
+        createTableStatement))
+
+      val insertEvent =
+        JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoOperationEvent])
+      assert(insertEvent.isInstanceOf[TrinoOperationEvent])
+      assert(StringUtils.equals(
+        insertEvent.asInstanceOf[TrinoOperationEvent].statement,
+        insertStatement))
+    }
+  }
+
+}