You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/08/25 06:29:02 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #922] Add OperationEvent for server to track operation state

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d704e4d  [KYUUBI #922] Add OperationEvent for server to track operation state
d704e4d is described below

commit d704e4d49d4a8b9a92076f040063f569176093b9
Author: hongdongdong <ho...@cmss.chinamobile.com>
AuthorDate: Wed Aug 25 14:28:48 2021 +0800

    [KYUUBI #922] Add OperationEvent for server to track operation state
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Track operation state change and support output to custom system.
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #968 from hddong/add-server-events.
    
    Closes #922
    
    8be8f268 [hongdongdong] Add prefix for StatementEvent
    9fdcc504 [hongdongdong] fix comment
    a0cdb35d [hongdongdong] fix test
    4d340c67 [hongdongdong] add dependency
    00920751 [hongdongdong] [KYUUBI#922]Add OperationEvent for server to track operation state
    
    Authored-by: hongdongdong <ho...@cmss.chinamobile.com>
    Signed-off-by: ulysses-you <ul...@gmail.com>
---
 dev/dependencyList                                 |  3 +
 docs/deployment/settings.md                        |  4 +-
 .../kyuubi/engine/spark/events/EngineEvent.scala   |  2 +-
 .../engine/spark/events/EventLoggingService.scala  | 32 ++++-----
 .../{KyuubiEvent.scala => KyuubiSparkEvent.scala}  | 12 +---
 .../kyuubi/engine/spark/events/SessionEvent.scala  |  2 +-
 ...tementEvent.scala => SparkStatementEvent.scala} |  8 +--
 .../engine/spark/operation/ExecuteStatement.scala  |  6 +-
 .../apache/spark/kyuubi/SparkContextHelper.scala   |  9 +--
 .../spark/events/EventLoggingServiceSuite.scala    | 10 +--
 kyuubi-common/pom.xml                              | 10 +++
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 25 ++++++-
 .../events/AbstractEventLoggingService.scala       | 20 +++---
 .../org/apache/kyuubi}/events/EventLogger.scala    |  6 +-
 .../apache/kyuubi}/events/EventLoggerType.scala    |  2 +-
 .../apache/kyuubi}/events/JsonEventLogger.scala    | 16 ++---
 .../org/apache/kyuubi}/events/JsonProtocol.scala   |  6 +-
 .../org/apache/kyuubi}/events/KyuubiEvent.scala    | 16 ++---
 .../apache/kyuubi/events/KyuubiServerEvent.scala   |  6 +-
 .../kyuubi/events/KyuubiStatementEvent.scala       | 42 +++++++----
 .../apache/kyuubi/operation/ExecuteStatement.scala | 25 ++++++-
 .../kyuubi/server}/EventLoggingService.scala       | 42 +++++------
 .../org/apache/kyuubi/server/KyuubiServer.scala    |  2 +
 .../kyuubi/events/EventLoggingServiceSuite.scala   | 82 ++++++++++++++++++++++
 .../apache/kyuubi/server/KyuubiServerSuite.scala   |  4 +-
 25 files changed, 270 insertions(+), 122 deletions(-)

diff --git a/dev/dependencyList b/dev/dependencyList
index 1c34658..b28e2ad 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -29,6 +29,8 @@ htrace-core4/4.1.0-incubating//htrace-core4-4.1.0-incubating.jar
 jackson-annotations/2.11.4//jackson-annotations-2.11.4.jar
 jackson-core/2.11.4//jackson-core-2.11.4.jar
 jackson-databind/2.11.4//jackson-databind-2.11.4.jar
+jackson-module-paranamer/2.11.4//jackson-module-paranamer-2.11.4.jar
+jackson-module-scala_2.12/2.11.4//jackson-module-scala_2.12-2.11.4.jar
 jakarta.servlet-api/4.0.4//jakarta.servlet-api-4.0.4.jar
 jaxb-api/2.2.11//jaxb-api-2.2.11.jar
 jcl-over-slf4j/1.7.30//jcl-over-slf4j-1.7.30.jar
@@ -46,6 +48,7 @@ metrics-core/4.1.1//metrics-core-4.1.1.jar
 metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar
 metrics-json/4.1.1//metrics-json-4.1.1.jar
 metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar
+paranamer/2.8//paranamer-2.8.jar
 scala-library/2.12.14//scala-library-2.12.14.jar
 scopt_2.12/4.0.1//scopt_2.12-4.0.1.jar
 simpleclient/0.10.0//simpleclient-0.10.0.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 150b216..c87796f 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -138,6 +138,8 @@ kyuubi\.backend\.engine<br>\.exec\.pool\.keepalive<br>\.time|<div style='width:
 kyuubi\.backend\.engine<br>\.exec\.pool\.shutdown<br>\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT10S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.backend\.engine<br>\.exec\.pool\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of threads in the operation execution thread pool of SQL engine applications</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.backend\.engine<br>\.exec\.pool\.wait\.queue<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool in SQL engine applications</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
+kyuubi\.backend\.server<br>\.event\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of server events go for the builtin JSON logger</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.backend\.server<br>\.event\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of server history loggers, where session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div styl [...]
 kyuubi\.backend\.server<br>\.exec\.pool\.keepalive<br>\.time|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.backend\.server<br>\.exec\.pool\.shutdown<br>\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT10S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.backend\.server<br>\.exec\.pool\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of threads in the operation execution thread pool of Kyuubi server</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
@@ -163,7 +165,7 @@ kyuubi\.engine<br>\.deregister\.exception<br>\.classes|<div style='width: 65pt;w
 kyuubi\.engine<br>\.deregister\.exception<br>\.messages|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
 kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself [...]
 kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
-kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
+kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
 kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done. [...]
 kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
 kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
index 77d6c54..bfea105 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
@@ -57,7 +57,7 @@ case class EngineEvent(
     endTime: Long,
     state: Int,
     diagnostic: String,
-    settings: Map[String, String]) extends KyuubiEvent {
+    settings: Map[String, String]) extends KyuubiSparkEvent {
 
   override def schema: StructType = Encoders.product[EngineEvent].schema
   override lazy val partitions: Seq[(String, String)] =
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
index 038ccf5..568ade2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
@@ -17,39 +17,35 @@
 
 package org.apache.kyuubi.engine.spark.events
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.kyuubi.SparkContextHelper
 
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_LOGGERS
 import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
 import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
-import org.apache.kyuubi.service.CompositeService
+import org.apache.kyuubi.events.AbstractEventLoggingService
+import org.apache.kyuubi.events.EventLoggerType
+import org.apache.kyuubi.events.JsonEventLogger
 
 class EventLoggingService(engine: SparkSQLEngine)
-  extends CompositeService("EventLogging") {
-
-  private val eventLoggers = new ArrayBuffer[EventLogger]()
-
-  def onEvent(event: KyuubiEvent): Unit = {
-    eventLoggers.foreach(_.logEvent(event))
-  }
+  extends AbstractEventLoggingService[KyuubiSparkEvent] {
 
   override def initialize(conf: KyuubiConf): Unit = {
-    conf.get(KyuubiConf.ENGINE_EVENT_LOGGERS)
+    conf.get(ENGINE_EVENT_LOGGERS)
       .map(EventLoggerType.withName)
-      .foreach {
+      .foreach{
         case EventLoggerType.SPARK =>
-          eventLoggers += SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext)
+          addEventLogger(SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext))
         case EventLoggerType.JSON =>
-          val jsonEventLogger = new JsonEventLogger(KyuubiSparkUtil.engineId,
-            engine.spark.sparkContext.hadoopConfiguration)
+          val jsonEventLogger = new JsonEventLogger[KyuubiSparkEvent](KyuubiSparkUtil.engineId,
+            ENGINE_EVENT_JSON_LOG_PATH, engine.spark.sparkContext.hadoopConfiguration)
           addService(jsonEventLogger)
-          eventLoggers += jsonEventLogger
+          addEventLogger(jsonEventLogger)
         case logger =>
           // TODO: Add more implementations
           throw new IllegalArgumentException(s"Unrecognized event logger: $logger")
-    }
+      }
     super.initialize(conf)
   }
 
@@ -69,7 +65,7 @@ object EventLoggingService {
 
   private var _service: Option[EventLoggingService] = None
 
-  def onEvent(event: KyuubiEvent): Unit = {
+  def onEvent(event: KyuubiSparkEvent): Unit = {
     _service.foreach(_.onEvent(event))
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
similarity index 76%
copy from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala
copy to externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
index a519b41..399f7c1 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
@@ -17,20 +17,14 @@
 
 package org.apache.kyuubi.engine.spark.events
 
-import java.util.Locale
-
 import org.apache.spark.scheduler.SparkListenerEvent
 import org.apache.spark.sql.types.StructType
 
-trait KyuubiEvent extends SparkListenerEvent with Product {
+import org.apache.kyuubi.events.KyuubiEvent
 
-  final def eventType: String = {
-    this.getClass.getSimpleName.stripSuffix("Event").toLowerCase(Locale.ROOT)
-  }
 
-  def schema: StructType
-  def partitions: Seq[(String, String)]
+trait KyuubiSparkEvent extends KyuubiEvent with SparkListenerEvent {
 
-  final def toJson: String = JsonProtocol.productToJson(this)
+  def schema: StructType
 
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index 56e3167..8b746ff 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -40,7 +40,7 @@ case class SessionEvent(
     ip: String,
     startTime: Long,
     var endTime: Long = -1L,
-    var totalOperations: Int = 0) extends KyuubiEvent {
+    var totalOperations: Int = 0) extends KyuubiSparkEvent {
 
   override def schema: StructType = Encoders.product[SessionEvent].schema
   override lazy val partitions: Seq[(String, String)] =
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
similarity index 88%
copy from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala
copy to externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
index d938b71..c650956 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
@@ -31,9 +31,9 @@ import org.apache.kyuubi.Utils
  * @param state: store each state that the sql has
  * @param stateTime: the time that the sql's state change
  * @param queryExecution: contains logicPlan and physicalPlan
- * @param exeception: caught exeception if have
+ * @param exception: caught exception if have
  */
-case class StatementEvent(
+case class SparkStatementEvent(
     statementId: String,
     statement: String,
     appId: String,
@@ -42,9 +42,9 @@ case class StatementEvent(
     var state: String,
     var stateTime: Long,
     var queryExecution: String = "",
-    var exeception: String = "") extends KyuubiEvent {
+    var exception: String = "") extends KyuubiSparkEvent {
 
-  override def schema: StructType = Encoders.product[StatementEvent].schema
+  override def schema: StructType = Encoders.product[SparkStatementEvent].schema
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index b932acc..530484c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.types._
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
-import org.apache.kyuubi.engine.spark.events.{EventLoggingService, StatementEvent}
+import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent}
 import org.apache.kyuubi.operation.{OperationState, OperationType}
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
@@ -59,7 +59,7 @@ class ExecuteStatement(
 
   private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
 
-  val statementEvent: StatementEvent = StatementEvent(
+  val statementEvent: SparkStatementEvent = SparkStatementEvent(
     statementId, statement, spark.sparkContext.applicationId,
     session.handle.identifier.toString, lastAccessTime, state.toString, lastAccessTime)
   EventLoggingService.onEvent(statementEvent)
@@ -175,7 +175,7 @@ class ExecuteStatement(
 
   override def setOperationException(opEx: KyuubiSQLException): Unit = {
     super.setOperationException(opEx)
-    statementEvent.exeception = opEx.toString
+    statementEvent.exception = opEx.toString
     EventLoggingService.onEvent(statementEvent)
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index 4cd50a2..dd10718 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -19,14 +19,15 @@ package org.apache.spark.kyuubi
 
 import org.apache.spark.SparkContext
 
-import org.apache.kyuubi.engine.spark.events.{EventLogger, KyuubiEvent}
+import org.apache.kyuubi.engine.spark.events.KyuubiSparkEvent
+import org.apache.kyuubi.events.EventLogger
 
 /**
  * A place to invoke non-public APIs of [[SparkContext]], anything to be added here need to
  * think twice
  */
 object SparkContextHelper {
-  def createSparkHistoryLogger(sc: SparkContext): EventLogger = {
+  def createSparkHistoryLogger(sc: SparkContext): EventLogger[KyuubiSparkEvent] = {
     new SparkHistoryEventLogger(sc)
   }
 }
@@ -35,8 +36,8 @@ object SparkContextHelper {
  * A [[EventLogger]] that logs everything to SparkHistory
  * @param sc SparkContext
  */
-private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger {
-  override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
+private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger[KyuubiSparkEvent] {
+  override def logEvent(kyuubiEvent: KyuubiSparkEvent): Unit = {
     sc.eventLogger.foreach(_.onOtherEvent(kyuubiEvent))
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
index bb52646..8d912a3 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
@@ -26,10 +26,11 @@ import org.scalatest.time.SpanSugar._
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, WithSparkSQLEngine}
+import org.apache.kyuubi.events.EventLoggerType._
+import org.apache.kyuubi.events.JsonProtocol
 import org.apache.kyuubi.operation.{JDBCTestUtils, OperationHandle}
 
 class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
-  import EventLoggerType._
 
   private val logRoot = Utils.createTempDir()
   private val currentDate = Utils.getDateFromTimestamp(System.currentTimeMillis())
@@ -50,8 +51,9 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
       logRoot.toString, "session", s"day=$currentDate", KyuubiSparkUtil.engineId + ".json")
     val engineEventReader = Files.newBufferedReader(engineEventPath, StandardCharsets.UTF_8)
 
-    val readEvent = JsonProtocol.jsonToEvent(engineEventReader.readLine())
-    assert(readEvent.isInstanceOf[KyuubiEvent])
+    val readEvent = JsonProtocol.jsonToEvent(engineEventReader.readLine(),
+      classOf[KyuubiSparkEvent])
+    assert(readEvent.isInstanceOf[KyuubiSparkEvent])
 
     withJdbcStatement() { statement =>
       val table = engineEventPath.getParent
@@ -92,7 +94,7 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
 
   test("statementEvent: generate, dump and query") {
     val statementEventPath = Paths.get(
-      logRoot.toString, "statement", s"day=$currentDate", engine.engineId + ".json")
+      logRoot.toString, "spark-statement", s"day=$currentDate", engine.engineId + ".json")
     val sql = "select timestamp'2021-06-01'"
     withSessionHandle { (client, handle) =>
 
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 45f416e..a411ffb 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -89,6 +89,16 @@
         </dependency>
 
         <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minikdc</artifactId>
             <scope>test</scope>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 069dd71..6073f5f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -682,13 +682,36 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(false)
 
+  val SERVER_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
+    buildConf("backend.server.event.json.log.path")
+      .doc("The location of server events go for the builtin JSON logger")
+      .version("1.4.0")
+      .stringConf
+      .createWithDefault("/tmp/kyuubi/events")
+
   val ENGINE_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
     buildConf("engine.event.json.log.path")
-      .doc("The location of all the engine events go for the builtin JSON logger")
+      .doc("The location of all the engine events go for the builtin JSON logger.")
       .version("1.3.0")
       .stringConf
       .createWithDefault("/tmp/kyuubi/events")
 
+  val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
+    buildConf("backend.server.event.loggers")
+      .doc("A comma separated list of server history loggers, where session/operation etc" +
+        " events go.<ul>" +
+        s" <li>JSON: the events will be written to the location of" +
+        s" ${SERVER_EVENT_JSON_LOG_PATH.key}</li>" +
+        s" <li>JDBC: to be done</li>" +
+        s" <li>CUSTOM: to be done.</li></ul>")
+      .version("1.4.0")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .toSequence()
+      .checkValue(_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM")),
+        "Unsupported event loggers")
+      .createWithDefault(Nil)
+
   val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
     buildConf("engine.event.loggers")
       .doc("A comma separated list of engine history loggers, where engine/session/operation etc" +
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
similarity index 63%
copy from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala
copy to kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
index be8e4f9..08ad39f 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
@@ -15,18 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import scala.collection.mutable.ArrayBuffer
 
-object JsonProtocol {
+import org.apache.kyuubi.service.CompositeService
 
-  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+abstract class AbstractEventLoggingService[T <: KyuubiEvent]
+  extends CompositeService("EventLogging") {
 
-  def productToJson[T <: KyuubiEvent](value: T): String = mapper.writeValueAsString(value)
+  private val eventLoggers = new ArrayBuffer[EventLogger[T]]()
 
-  def jsonToEvent(jsonValue: String): KyuubiEvent = {
-    mapper.readValue(jsonValue, classOf[KyuubiEvent])
+  def onEvent(event: T): Unit = {
+    eventLoggers.foreach(_.logEvent(event))
+  }
+
+  def addEventLogger(logger: EventLogger[T]): Unit = {
+    eventLoggers += logger
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
similarity index 87%
copy from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala
copy to kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
index 4b39330..e56136e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
-trait EventLogger {
+trait EventLogger[T <: KyuubiEvent] {
 
-  def logEvent(kyuubiEvent: KyuubiEvent): Unit
+  def logEvent(kyuubiEvent: T): Unit
 
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
similarity index 95%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
index 690b934..f2cc4c5 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
 object EventLoggerType extends Enumeration {
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
similarity index 88%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
index 32c2a59..0f97b01 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
 import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
 import java.net.URI
@@ -29,9 +29,8 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
-import org.apache.kyuubi.engine.spark.events.JsonEventLogger.{JSON_LOG_DIR_PERM, JSON_LOG_FILE_PERM}
+import org.apache.kyuubi.config.{ConfigEntry, KyuubiConf}
+import org.apache.kyuubi.events.JsonEventLogger._
 import org.apache.kyuubi.service.AbstractService
 
 /**
@@ -42,8 +41,9 @@ import org.apache.kyuubi.service.AbstractService
  * The ${date} is based on the time of events, e.g. engine.startTime, statement.startTime
  * @param logName the engine id formed of appId + attemptId(if any)
  */
-class JsonEventLogger(logName: String, hadoopConf: Configuration)
-  extends AbstractService("JsonEventLogger") with EventLogger with Logging {
+class JsonEventLogger[T <: KyuubiEvent](logName: String,
+    logPath: ConfigEntry[String], hadoopConf: Configuration)
+  extends AbstractService("JsonEventLogger") with EventLogger[T] with Logging {
 
   type Logger = (PrintWriter, Option[FSDataOutputStream])
 
@@ -83,7 +83,7 @@ class JsonEventLogger(logName: String, hadoopConf: Configuration)
   }
 
   override def initialize(conf: KyuubiConf): Unit = synchronized {
-    logRoot = Paths.get(conf.get(ENGINE_EVENT_JSON_LOG_PATH)).toAbsolutePath.toUri
+    logRoot = Paths.get(conf.get(logPath)).toAbsolutePath.toUri
     fs = FileSystem.get(logRoot, hadoopConf)
     requireLogRootWritable()
     super.initialize(conf)
@@ -100,7 +100,7 @@ class JsonEventLogger(logName: String, hadoopConf: Configuration)
     super.stop()
   }
 
-  override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
+  override def logEvent(kyuubiEvent: T): Unit = {
     val (writer, stream) = getOrUpdate(kyuubiEvent)
     // scalastyle:off println
     writer.println(kyuubiEvent.toJson)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala
similarity index 87%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala
index be8e4f9..32aef4f 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -26,7 +26,7 @@ object JsonProtocol {
 
   def productToJson[T <: KyuubiEvent](value: T): String = mapper.writeValueAsString(value)
 
-  def jsonToEvent(jsonValue: String): KyuubiEvent = {
-    mapper.readValue(jsonValue, classOf[KyuubiEvent])
+  def jsonToEvent[T <: KyuubiEvent](jsonValue: String, cls: Class[T]): KyuubiEvent = {
+    mapper.readValue(jsonValue, cls)
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/KyuubiEvent.scala
similarity index 73%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/events/KyuubiEvent.scala
index a519b41..39a59c5 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/KyuubiEvent.scala
@@ -15,22 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
 import java.util.Locale
 
-import org.apache.spark.scheduler.SparkListenerEvent
-import org.apache.spark.sql.types.StructType
-
-trait KyuubiEvent extends SparkListenerEvent with Product {
-
-  final def eventType: String = {
-    this.getClass.getSimpleName.stripSuffix("Event").toLowerCase(Locale.ROOT)
+trait KyuubiEvent extends Product {
+  final lazy val eventType: String = {
+    this.getClass.getSimpleName.stripSuffix("Event")
+      .replaceAll("(.)([A-Z])", "$1-$2")
+      .toLowerCase(Locale.ROOT)
   }
 
-  def schema: StructType
   def partitions: Seq[(String, String)]
 
   final def toJson: String = JsonProtocol.productToJson(this)
-
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
similarity index 87%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
index 4b39330..7a99991 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.events
 
-trait EventLogger {
-
-  def logEvent(kyuubiEvent: KyuubiEvent): Unit
+trait KyuubiServerEvent extends KyuubiEvent {
 
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
similarity index 62%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
index d938b71..d7da77d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
@@ -15,36 +15,52 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
-
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+package org.apache.kyuubi.events
 
 import org.apache.kyuubi.Utils
+import org.apache.kyuubi.operation.ExecuteStatement
+import org.apache.kyuubi.operation.OperationState.OperationState
 
 /**
+ *
+ * @param user: who connect to kyuubi server
  * @param statementId: the identifier of operationHandler
  * @param statement: the sql that you execute
- * @param appId: application id a.k.a, the unique id for engine
+ * @param remoteIp: the ip of user
  * @param sessionId: the identifier of a session
  * @param createTime: the create time of this statement
  * @param state: store each state that the sql has
  * @param stateTime: the time that the sql's state change
- * @param queryExecution: contains logicPlan and physicalPlan
- * @param exeception: caught exeception if have
+ * @param exception: caught exception if have
  */
-case class StatementEvent(
+case class KyuubiStatementEvent(
+    user: String,
     statementId: String,
     statement: String,
-    appId: String,
+    remoteIp: String,
     sessionId: String,
     createTime: Long,
     var state: String,
     var stateTime: Long,
-    var queryExecution: String = "",
-    var exeception: String = "") extends KyuubiEvent {
-
-  override def schema: StructType = Encoders.product[StatementEvent].schema
+    var exception: String = "") extends KyuubiServerEvent {
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
 }
+
+object KyuubiStatementEvent {
+  def apply(statement: ExecuteStatement,
+      statementId: String,
+      state: OperationState,
+      stateTime: Long): KyuubiStatementEvent = {
+    val session = statement.getSession
+    new KyuubiStatementEvent(
+      session.user,
+      statementId,
+      statement.statement,
+      session.ipAddress,
+      session.handle.identifier.toString,
+      stateTime,
+      state.toString,
+      stateTime)
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 13ceed1..779b4ad 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -19,14 +19,19 @@ package org.apache.kyuubi.operation
 
 import scala.collection.JavaConverters._
 
-import org.apache.hive.service.rpc.thrift.{TFetchOrientation, TFetchResultsReq, TGetOperationStatusReq}
+import org.apache.hive.service.rpc.thrift.TFetchOrientation
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq
 import org.apache.hive.service.rpc.thrift.TOperationState._
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.client.KyuubiSyncThriftClient
+import org.apache.kyuubi.events.KyuubiStatementEvent
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.EventLoggingService
 import org.apache.kyuubi.session.Session
 
 class ExecuteStatement(
@@ -38,6 +43,9 @@ class ExecuteStatement(
   extends KyuubiOperation(
     OperationType.EXECUTE_STATEMENT, session, client) {
 
+  val statementEvent: KyuubiStatementEvent =
+    KyuubiStatementEvent(this, statementId, state, lastAccessTime)
+
   private final val _operationLog: OperationLog = if (shouldRunAsync) {
     OperationLog.createOperationLog(session.handle, getHandle)
   } else {
@@ -53,6 +61,8 @@ class ExecuteStatement(
     req
   }
 
+  EventLoggingService.onEvent(statementEvent)
+
   override def beforeRun(): Unit = {
     OperationLog.setCurrentOperationLog(_operationLog)
     setHasResultSet(true)
@@ -147,6 +157,19 @@ class ExecuteStatement(
     }
   }
 
+  override def setState(newState: OperationState): Unit = {
+    super.setState(newState)
+    statementEvent.state = newState.toString
+    statementEvent.stateTime = lastAccessTime
+    EventLoggingService.onEvent(statementEvent)
+  }
+
+  override def setOperationException(opEx: KyuubiSQLException): Unit = {
+    super.setOperationException(opEx)
+    statementEvent.exception = opEx.toString
+    EventLoggingService.onEvent(statementEvent)
+  }
+
   override def close(): Unit = {
     MetricsSystem.tracing(_.decCount(STATEMENT_OPEN))
     super.close()
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
similarity index 59%
copy from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
copy to kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
index 038ccf5..0111c48 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
@@ -15,41 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.events
+package org.apache.kyuubi.server
 
-import scala.collection.mutable.ArrayBuffer
+import java.net.InetAddress
 
-import org.apache.spark.kyuubi.SparkContextHelper
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
-import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
-import org.apache.kyuubi.service.CompositeService
+import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_JSON_LOG_PATH
+import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_LOGGERS
+import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType}
+import org.apache.kyuubi.events.JsonEventLogger
+import org.apache.kyuubi.events.KyuubiServerEvent
+import org.apache.kyuubi.server.EventLoggingService._service
 
-class EventLoggingService(engine: SparkSQLEngine)
-  extends CompositeService("EventLogging") {
-
-  private val eventLoggers = new ArrayBuffer[EventLogger]()
-
-  def onEvent(event: KyuubiEvent): Unit = {
-    eventLoggers.foreach(_.logEvent(event))
-  }
+class EventLoggingService extends AbstractEventLoggingService[KyuubiServerEvent] {
 
   override def initialize(conf: KyuubiConf): Unit = {
-    conf.get(KyuubiConf.ENGINE_EVENT_LOGGERS)
+    conf.get(SERVER_EVENT_LOGGERS)
       .map(EventLoggerType.withName)
-      .foreach {
-        case EventLoggerType.SPARK =>
-          eventLoggers += SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext)
+      .foreach{
         case EventLoggerType.JSON =>
-          val jsonEventLogger = new JsonEventLogger(KyuubiSparkUtil.engineId,
-            engine.spark.sparkContext.hadoopConfiguration)
+          val hostName = InetAddress.getLocalHost.getCanonicalHostName
+          val jsonEventLogger = new JsonEventLogger[KyuubiServerEvent](s"server-$hostName",
+            SERVER_EVENT_JSON_LOG_PATH, new Configuration())
           addService(jsonEventLogger)
-          eventLoggers += jsonEventLogger
+          addEventLogger(jsonEventLogger)
         case logger =>
           // TODO: Add more implementations
           throw new IllegalArgumentException(s"Unrecognized event logger: $logger")
-    }
+      }
     super.initialize(conf)
   }
 
@@ -69,8 +64,7 @@ object EventLoggingService {
 
   private var _service: Option[EventLoggingService] = None
 
-  def onEvent(event: KyuubiEvent): Unit = {
+  def onEvent(event: KyuubiServerEvent): Unit = {
     _service.foreach(_.onEvent(event))
   }
 }
-
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 86c11b8..e955331 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -83,6 +83,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
 
   override val backendService: AbstractBackendService = new KyuubiBackendService()
   override val frontendService = new KyuubiFrontendService(backendService)
+  private val eventLoggingService: EventLoggingService = new EventLoggingService
   override protected def supportsServiceDiscovery: Boolean = {
     ServiceDiscovery.supportServiceDiscovery(conf)
   }
@@ -91,6 +92,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     val kinit = new KinitAuxiliaryService()
     addService(kinit)
+    addService(eventLoggingService)
 
     if (conf.get(MetricsConf.METRICS_ENABLED)) {
       addService(new MetricsSystem)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
new file mode 100644
index 0000000..988fe4d
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events
+
+import java.net.InetAddress
+import java.nio.file.Paths
+
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.JDBCTestUtils
+import org.apache.kyuubi.operation.OperationState._
+
+class EventLoggingServiceSuite extends WithKyuubiServer with JDBCTestUtils {
+
+  private val logRoot = Utils.createTempDir()
+  private val currentDate = Utils.getDateFromTimestamp(System.currentTimeMillis())
+
+  override protected val conf: KyuubiConf = {
+    KyuubiConf()
+      .set(KyuubiConf.SERVER_EVENT_LOGGERS, Seq("JSON"))
+      .set(KyuubiConf.SERVER_EVENT_JSON_LOG_PATH, logRoot.toString)
+      .set(KyuubiConf.ENGINE_EVENT_LOGGERS, Seq("JSON"))
+      .set(KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH, logRoot.toString)
+  }
+
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  test("statementEvent: generate, dump and query") {
+    val hostName = InetAddress.getLocalHost.getCanonicalHostName
+    val serverStatementEventPath =
+      Paths.get(logRoot.toString, "kyuubi-statement", s"day=$currentDate", s"server-$hostName.json")
+    val engineStatementEventPath =
+      Paths.get(logRoot.toString, "spark-statement", s"day=$currentDate", "*.json")
+    val sql = "select timestamp'2021-06-01'"
+
+    withJdbcStatement() { statement =>
+      statement.execute(sql)
+
+      // check server statement events
+      val serverTable = serverStatementEventPath.getParent
+      val resultSet = statement.executeQuery(s"SELECT * FROM `json`.`${serverTable}`" +
+        "where statement = \"" + sql + "\"")
+      val states = Array(INITIALIZED, PENDING, RUNNING, FINISHED, CLOSED)
+      var stateIndex = 0
+      while (resultSet.next()) {
+        assert(resultSet.getString("user") == Utils.currentUser)
+        assert(resultSet.getString("statement") == sql)
+        assert(resultSet.getString("state") == states(stateIndex).toString)
+        stateIndex += 1
+      }
+
+      // check engine statement events
+      val engineTable = engineStatementEventPath.getParent
+      val resultSet2 = statement.executeQuery(s"SELECT * FROM `json`.`${engineTable}`" +
+        "where statement = \"" + sql + "\"")
+      val engineStates = Array(INITIALIZED, PENDING, RUNNING, COMPILED, FINISHED)
+      stateIndex = 0
+      while (resultSet2.next()) {
+        assert(resultSet2.getString("Event") ==
+          "org.apache.kyuubi.engine.spark.events.SparkStatementEvent")
+        assert(resultSet2.getString("statement") == sql)
+        assert(resultSet2.getString("state") == engineStates(stateIndex).toString)
+        stateIndex += 1
+      }
+    }
+  }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
index 0df2cde..d44b8f2 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
@@ -35,7 +35,9 @@ class KyuubiServerSuite extends KyuubiFunSuite {
 
     server.initialize(conf)
     assert(server.getServiceState === INITIALIZED)
-    val backendService = server.getServices(2).asInstanceOf[KyuubiBackendService]
+    val backendServices = server.getServices.filter(_.isInstanceOf[KyuubiBackendService])
+    assert(backendServices.size == 1)
+    val backendService = backendServices(0).asInstanceOf[KyuubiBackendService]
     assert(backendService.getServiceState == INITIALIZED)
     assert(backendService.getServices.forall(_.getServiceState === INITIALIZED))
     assert(server.connectionUrl.split(":").length === 2)