You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/03/22 01:46:22 UTC

[kyuubi] branch master updated: [KYUUBI #3929] Refactor lineage plugin to add LineageDispatcher

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f3195bd [KYUUBI #3929] Refactor lineage plugin to add LineageDispatcher
c4f3195bd is described below

commit c4f3195bd63466bf4c29432d76574b4dbd823890
Author: wForget <64...@qq.com>
AuthorDate: Wed Mar 22 09:46:14 2023 +0800

    [KYUUBI #3929] Refactor lineage plugin to add LineageDispatcher
    
    ### _Why are the changes needed?_
    
    Refactor lineage plugin to add LineageDispatcher.
    
    close #3929
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3919 from wForget/dev-lineage-dispatcher.
    
    Closes #3929
    
    5df2aa2f [wforget] add doc
    98683ebc [wforget] fix
    7b97b2e0 [wForget] rebase
    4b046868 [wForget] separate LineageDispatcherType class file
    e14cf838 [wForget] Refactor lineage plugin to add LineageDispatcher
    
    Lead-authored-by: wForget <64...@qq.com>
    Co-authored-by: wforget <64...@qq.com>
    Signed-off-by: ulyssesyou <ul...@apache.org>
---
 docs/extensions/engines/spark/lineage.md           | 16 +++++++-
 extensions/spark/kyuubi-spark-lineage/pom.xml      | 15 +++++--
 .../OperationLineageEvent.scala => Lineage.scala}  | 10 +----
 .../plugin/lineage/LineageDispatcher.scala}        | 30 ++++++++++----
 .../plugin/lineage/LineageDispatcherType.scala}    | 15 ++-----
 ...arkOperationLineageQueryExecutionListener.scala | 13 +++---
 .../lineage/dispatcher/KyuubiEventDispatcher.scala | 48 ++++++++++++++++++++++
 .../SparkEventDispatcher.scala}                    | 25 ++++++-----
 .../helper/SparkSQLLineageParseHelper.scala        |  2 +-
 .../apache/spark/kyuubi/lineage/LineageConf.scala  | 16 ++++++++
 .../events/OperationLineageEventSuite.scala        | 33 +++++++++++----
 .../helper/SparkSQLLineageParserHelperSuite.scala  |  2 +-
 12 files changed, 163 insertions(+), 62 deletions(-)

diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md
index 0f4b0c458..cd38be4ba 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -179,9 +179,18 @@ to add one configurations.
 spark.kyuubi.plugin.lineage.skip.parsing.permanent.view.enabled=true
 ```
 
-### Get Lineage Events from SparkListener
+### Get Lineage Events
 
-All lineage events will be sent to the `SparkListenerBus`. To handle lineage events, a new `SparkListener` needs to be added.
+The lineage dispatchers are used to dispatch lineage events, configured via `spark.kyuubi.plugin.lineage.dispatchers`.
+
+<ul>
+  <li>SPARK_EVENT (by default): send lineage event to spark event bus</li>
+  <li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>
+</ul>
+
+#### Get Lineage Events from SparkListener
+
+When using the `SPARK_EVENT` dispatcher, the lineage events will be sent to the `SparkListenerBus`. To handle lineage events, a new `SparkListener` needs to be added.
 Example for Adding `SparkListener`:
 
 ```scala
@@ -196,3 +205,6 @@ spark.sparkContext.addSparkListener(new SparkListener {
     })
 ```
 
+#### Get Lineage Events from Kyuubi EventHandler
+
+When using the `KYUUBI_EVENT` dispatcher, the lineage events will be sent to the Kyuubi `EventBus`. Refer to [Kyuubi Event Handler](../../server/events) to handle kyuubi events.
diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml b/extensions/spark/kyuubi-spark-lineage/pom.xml
index c8f9b30bd..bc13480d7 100644
--- a/extensions/spark/kyuubi-spark-lineage/pom.xml
+++ b/extensions/spark/kyuubi-spark-lineage/pom.xml
@@ -38,15 +38,22 @@
         </dependency>
 
         <dependency>
-            <groupId>commons-collections</groupId>
-            <artifactId>commons-collections</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
-            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <artifactId>kyuubi-events_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
             <scope>test</scope>
         </dependency>
 
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/Lineage.scala
similarity index 88%
rename from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
rename to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/Lineage.scala
index c69b45709..4bd0bd0b1 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/Lineage.scala
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.lineage.events
-
-import org.apache.spark.scheduler.SparkListenerEvent
+package org.apache.kyuubi.plugin.lineage
 
 case class ColumnLineage(column: String, originalColumns: Set[String])
 
@@ -60,9 +58,3 @@ object Lineage {
     new Lineage(inputTables, outputTables, newColumnLineage)
   }
 }
-
-case class OperationLineageEvent(
-    executionId: Long,
-    eventTime: Long,
-    lineage: Option[Lineage],
-    exception: Option[Throwable]) extends SparkListenerEvent
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
similarity index 50%
copy from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
copy to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
index 777e70121..8f5dc0d9e 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
@@ -15,17 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.spark.kyuubi.lineage
+package org.apache.kyuubi.plugin.lineage
 
-import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.sql.execution.QueryExecution
 
-object LineageConf {
+import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, SparkEventDispatcher}
 
-  val SKIP_PARSING_PERMANENT_VIEW_ENABLED =
-    ConfigBuilder("spark.kyuubi.plugin.lineage.skip.parsing.permanent.view.enabled")
-      .doc("Whether to skip the lineage parsing of permanent views")
-      .version("1.8.0")
-      .booleanConf
-      .createWithDefault(false)
+trait LineageDispatcher {
+
+  def send(qe: QueryExecution, lineage: Option[Lineage]): Unit
+
+  def onFailure(qe: QueryExecution, exception: Exception): Unit = {}
+
+}
+
+object LineageDispatcher {
+
+  def apply(dispatcherType: String): LineageDispatcher = {
+    LineageDispatcherType.withName(dispatcherType) match {
+      case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher()
+      case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher()
+      case _ => throw new UnsupportedOperationException(
+          s"Unsupported lineage dispatcher: $dispatcherType.")
+    }
+  }
 
 }
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
similarity index 67%
copy from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
copy to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
index 777e70121..d6afea152 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
@@ -15,17 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.spark.kyuubi.lineage
+package org.apache.kyuubi.plugin.lineage
 
-import org.apache.spark.internal.config.ConfigBuilder
-
-object LineageConf {
-
-  val SKIP_PARSING_PERMANENT_VIEW_ENABLED =
-    ConfigBuilder("spark.kyuubi.plugin.lineage.skip.parsing.permanent.view.enabled")
-      .doc("Whether to skip the lineage parsing of permanent views")
-      .version("1.8.0")
-      .booleanConf
-      .createWithDefault(false)
+object LineageDispatcherType extends Enumeration {
+  type LineageDispatcherType = Value
 
+  val SPARK_EVENT, KYUUBI_EVENT = Value
 }
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
index 01deb0a44..b83117cde 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
@@ -17,24 +17,25 @@
 
 package org.apache.kyuubi.plugin.lineage
 
-import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.util.QueryExecutionListener
 
-import org.apache.kyuubi.plugin.lineage.events.OperationLineageEvent
 import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
 
 class SparkOperationLineageQueryExecutionListener extends QueryExecutionListener {
 
+  private lazy val dispatchers: Seq[LineageDispatcher] = {
+    SparkContextHelper.getConf(LineageConf.DISPATCHERS).map(LineageDispatcher(_))
+  }
+
   override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
     val lineage =
       SparkSQLLineageParseHelper(qe.sparkSession).transformToLineage(qe.id, qe.analyzed)
-    val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), lineage, None)
-    SparkContextHelper.postEventToSparkListenerBus(event)
+    dispatchers.foreach(_.send(qe, lineage))
   }
 
   override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
-    val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
-    SparkContextHelper.postEventToSparkListenerBus(event)
+    dispatchers.foreach(_.onFailure(qe, exception))
   }
 }
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiEventDispatcher.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiEventDispatcher.scala
new file mode 100644
index 000000000..6a9e65948
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiEventDispatcher.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher
+
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.events.{EventBus, KyuubiEvent}
+import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
+
+class KyuubiEventDispatcher extends LineageDispatcher {
+
+  override def send(qe: QueryExecution, lineage: Option[Lineage]): Unit = {
+    val event = OperationLineageKyuubiEvent(qe.id, System.currentTimeMillis(), lineage, None)
+    EventBus.post(event)
+  }
+
+  override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
+    val event =
+      OperationLineageKyuubiEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
+    EventBus.post(event)
+  }
+
+}
+
+case class OperationLineageKyuubiEvent(
+    executionId: Long,
+    eventTime: Long,
+    lineage: Option[Lineage],
+    exception: Option[Throwable]) extends KyuubiEvent {
+  override def partitions: Seq[(String, String)] =
+    ("day", Utils.getDateFromTimestamp(eventTime)) :: Nil
+}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/SparkEventDispatcher.scala
similarity index 56%
copy from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
copy to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/SparkEventDispatcher.scala
index 01deb0a44..36fbbb7d4 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/SparkEventDispatcher.scala
@@ -15,26 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.lineage
+package org.apache.kyuubi.plugin.lineage.dispatcher
 
 import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.scheduler.SparkListenerEvent
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
 
-import org.apache.kyuubi.plugin.lineage.events.OperationLineageEvent
-import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
+import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
 
-class SparkOperationLineageQueryExecutionListener extends QueryExecutionListener {
+class SparkEventDispatcher extends LineageDispatcher {
 
-  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
-    val lineage =
-      SparkSQLLineageParseHelper(qe.sparkSession).transformToLineage(qe.id, qe.analyzed)
-    val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), lineage, None)
+  override def send(qe: QueryExecution, lineage: Option[Lineage]): Unit = {
+    val event = OperationLineageSparkEvent(qe.id, System.currentTimeMillis(), lineage, None)
     SparkContextHelper.postEventToSparkListenerBus(event)
   }
 
-  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
-    val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
+  override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
+    val event = OperationLineageSparkEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
     SparkContextHelper.postEventToSparkListenerBus(event)
   }
 }
+
+case class OperationLineageSparkEvent(
+    executionId: Long,
+    eventTime: Long,
+    lineage: Option[Lineage],
+    exception: Option[Throwable]) extends SparkListenerEvent
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index dd085f404..793572611 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
 
-import org.apache.kyuubi.plugin.lineage.events.Lineage
+import org.apache.kyuubi.plugin.lineage.Lineage
 import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
 
 trait LineageParser {
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
index 777e70121..6fb5399c0 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
@@ -19,6 +19,8 @@ package org.apache.spark.kyuubi.lineage
 
 import org.apache.spark.internal.config.ConfigBuilder
 
+import org.apache.kyuubi.plugin.lineage.LineageDispatcherType
+
 object LineageConf {
 
   val SKIP_PARSING_PERMANENT_VIEW_ENABLED =
@@ -28,4 +30,18 @@ object LineageConf {
       .booleanConf
       .createWithDefault(false)
 
+  val DISPATCHERS = ConfigBuilder("spark.kyuubi.plugin.lineage.dispatchers")
+    .doc("The lineage dispatchers are implementations of " +
+      "`org.apache.kyuubi.plugin.lineage.LineageDispatcher` for dispatching lineage events.<ul>" +
+      "<li>SPARK_EVENT: send lineage event to spark event bus</li>" +
+      "<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>" +
+      "</ul>")
+    .version("1.8.0")
+    .stringConf
+    .toSequence
+    .checkValue(
+      _.toSet.subsetOf(LineageDispatcherType.values.map(_.toString)),
+      "Unsupported lineage dispatchers")
+    .createWithDefault(Seq(LineageDispatcherType.SPARK_EVENT.toString))
+
 }
diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
index 1057bd74e..67e94ad0b 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
@@ -22,11 +22,14 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import scala.collection.immutable.List
 
 import org.apache.spark.SparkConf
-import org.apache.spark.kyuubi.lineage.LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED
+import org.apache.spark.kyuubi.lineage.LineageConf._
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.SparkListenerExtensionTest
 
 import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.events.EventBus
+import org.apache.kyuubi.plugin.lineage.Lineage
+import org.apache.kyuubi.plugin.lineage.dispatcher.{OperationLineageKyuubiEvent, OperationLineageSparkEvent}
 import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
 
 class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
@@ -43,19 +46,21 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
       .set(
         "spark.sql.queryExecutionListeners",
         "org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener")
+      .set(DISPATCHERS.key, "SPARK_EVENT,KYUUBI_EVENT")
       .set(SKIP_PARSING_PERMANENT_VIEW_ENABLED.key, "true")
   }
 
   test("operation lineage event capture: for execute sql") {
-    val countDownLatch = new CountDownLatch(1)
-    var actual: Lineage = null
+    val countDownLatch = new CountDownLatch(2)
+    // get lineage from spark event
+    var actualSparkEventLineage: Lineage = null
     spark.sparkContext.addSparkListener(new SparkListener {
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
         event match {
-          case lineageEvent: OperationLineageEvent =>
+          case lineageEvent: OperationLineageSparkEvent =>
             lineageEvent.lineage.foreach {
               case lineage if lineage.inputTables.nonEmpty =>
-                actual = lineage
+                actualSparkEventLineage = lineage
                 countDownLatch.countDown()
             }
           case _ =>
@@ -63,6 +68,16 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
       }
     })
 
+    // get lineage from kyuubi event
+    var actualKyuubiEventLineage: Lineage = null
+    EventBus.register[OperationLineageKyuubiEvent] { lineageEvent: OperationLineageKyuubiEvent =>
+      lineageEvent.lineage.foreach {
+        case lineage if lineage.inputTables.nonEmpty =>
+          actualKyuubiEventLineage = lineage
+          countDownLatch.countDown()
+      }
+    }
+
     withTable("test_table0") { _ =>
       spark.sql("create table test_table0(a string, b string)")
       spark.sql("select a as col0, b as col1 from test_table0").collect()
@@ -73,7 +88,8 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
           ("col0", Set("default.test_table0.a")),
           ("col1", Set("default.test_table0.b"))))
       countDownLatch.await(20, TimeUnit.SECONDS)
-      assert(actual == expected)
+      assert(actualSparkEventLineage == expected)
+      assert(actualKyuubiEventLineage == expected)
     }
   }
 
@@ -90,7 +106,8 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
     spark.sparkContext.addSparkListener(new SparkListener {
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
         event match {
-          case lineageEvent: OperationLineageEvent if executionId == lineageEvent.executionId =>
+          case lineageEvent: OperationLineageSparkEvent
+              if executionId == lineageEvent.executionId =>
             lineageEvent.lineage.foreach { lineage =>
               assert(lineage == expected)
               countDownLatch.countDown()
@@ -126,7 +143,7 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
     spark.sparkContext.addSparkListener(new SparkListener {
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
         event match {
-          case lineageEvent: OperationLineageEvent =>
+          case lineageEvent: OperationLineageSparkEvent =>
             lineageEvent.lineage.foreach {
               case lineage if lineage.inputTables.nonEmpty && lineage.outputTables.isEmpty =>
                 actual = lineage
diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 0c69885da..c25c94d39 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, SchemaRel
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
 import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.plugin.lineage.events.Lineage
+import org.apache.kyuubi.plugin.lineage.Lineage
 import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
 
 class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite