You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/03/22 01:46:22 UTC
[kyuubi] branch master updated: [KYUUBI #3929] Refactor lineage plugin to add LineageDispatcher
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c4f3195bd [KYUUBI #3929] Refactor lineage plugin to add LineageDispatcher
c4f3195bd is described below
commit c4f3195bd63466bf4c29432d76574b4dbd823890
Author: wForget <64...@qq.com>
AuthorDate: Wed Mar 22 09:46:14 2023 +0800
[KYUUBI #3929] Refactor lineage plugin to add LineageDispatcher
### _Why are the changes needed?_
Refactor lineage plugin to add LineageDispatcher.
close #3929
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3919 from wForget/dev-lineage-dispatcher.
Closes #3929
5df2aa2f [wforget] add doc
98683ebc [wforget] fix
7b97b2e0 [wForget] rebase
4b046868 [wForget] separate LineageDispatcherType class file
e14cf838 [wForget] Refactor lineage plugin to add LineageDispatcher
Lead-authored-by: wForget <64...@qq.com>
Co-authored-by: wforget <64...@qq.com>
Signed-off-by: ulyssesyou <ul...@apache.org>
---
docs/extensions/engines/spark/lineage.md | 16 +++++++-
extensions/spark/kyuubi-spark-lineage/pom.xml | 15 +++++--
.../OperationLineageEvent.scala => Lineage.scala} | 10 +----
.../plugin/lineage/LineageDispatcher.scala} | 30 ++++++++++----
.../plugin/lineage/LineageDispatcherType.scala} | 15 ++-----
...arkOperationLineageQueryExecutionListener.scala | 13 +++---
.../lineage/dispatcher/KyuubiEventDispatcher.scala | 48 ++++++++++++++++++++++
.../SparkEventDispatcher.scala} | 25 ++++++-----
.../helper/SparkSQLLineageParseHelper.scala | 2 +-
.../apache/spark/kyuubi/lineage/LineageConf.scala | 16 ++++++++
.../events/OperationLineageEventSuite.scala | 33 +++++++++++----
.../helper/SparkSQLLineageParserHelperSuite.scala | 2 +-
12 files changed, 163 insertions(+), 62 deletions(-)
diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md
index 0f4b0c458..cd38be4ba 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -179,9 +179,18 @@ to add one configurations.
spark.kyuubi.plugin.lineage.skip.parsing.permanent.view.enabled=true
```
-### Get Lineage Events from SparkListener
+### Get Lineage Events
-All lineage events will be sent to the `SparkListenerBus`. To handle lineage events, a new `SparkListener` needs to be added.
+The lineage dispatchers are used to dispatch lineage events, configured via `spark.kyuubi.plugin.lineage.dispatchers`.
+
+<ul>
+ <li>SPARK_EVENT (by default): send lineage event to spark event bus</li>
+ <li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>
+</ul>
+
+#### Get Lineage Events from SparkListener
+
+When using the `SPARK_EVENT` dispatcher, the lineage events will be sent to the `SparkListenerBus`. To handle lineage events, a new `SparkListener` needs to be added.
Example for Adding `SparkListener`:
```scala
@@ -196,3 +205,6 @@ spark.sparkContext.addSparkListener(new SparkListener {
})
```
+#### Get Lineage Events from Kyuubi EventHandler
+
+When using the `KYUUBI_EVENT` dispatcher, the lineage events will be sent to the Kyuubi `EventBus`. Refer to [Kyuubi Event Handler](../../server/events) to handle kyuubi events.
diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml b/extensions/spark/kyuubi-spark-lineage/pom.xml
index c8f9b30bd..bc13480d7 100644
--- a/extensions/spark/kyuubi-spark-lineage/pom.xml
+++ b/extensions/spark/kyuubi-spark-lineage/pom.xml
@@ -38,15 +38,22 @@
</dependency>
<dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
- <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <artifactId>kyuubi-events_${scala.binary.version}</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
<scope>test</scope>
</dependency>
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/Lineage.scala
similarity index 88%
rename from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
rename to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/Lineage.scala
index c69b45709..4bd0bd0b1 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/Lineage.scala
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.kyuubi.plugin.lineage.events
-
-import org.apache.spark.scheduler.SparkListenerEvent
+package org.apache.kyuubi.plugin.lineage
case class ColumnLineage(column: String, originalColumns: Set[String])
@@ -60,9 +58,3 @@ object Lineage {
new Lineage(inputTables, outputTables, newColumnLineage)
}
}
-
-case class OperationLineageEvent(
- executionId: Long,
- eventTime: Long,
- lineage: Option[Lineage],
- exception: Option[Throwable]) extends SparkListenerEvent
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
similarity index 50%
copy from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
copy to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
index 777e70121..8f5dc0d9e 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
@@ -15,17 +15,29 @@
* limitations under the License.
*/
-package org.apache.spark.kyuubi.lineage
+package org.apache.kyuubi.plugin.lineage
-import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.sql.execution.QueryExecution
-object LineageConf {
+import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, SparkEventDispatcher}
- val SKIP_PARSING_PERMANENT_VIEW_ENABLED =
- ConfigBuilder("spark.kyuubi.plugin.lineage.skip.parsing.permanent.view.enabled")
- .doc("Whether to skip the lineage parsing of permanent views")
- .version("1.8.0")
- .booleanConf
- .createWithDefault(false)
+trait LineageDispatcher {
+
+ def send(qe: QueryExecution, lineage: Option[Lineage]): Unit
+
+ def onFailure(qe: QueryExecution, exception: Exception): Unit = {}
+
+}
+
+object LineageDispatcher {
+
+ def apply(dispatcherType: String): LineageDispatcher = {
+ LineageDispatcherType.withName(dispatcherType) match {
+ case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher()
+ case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher()
+ case _ => throw new UnsupportedOperationException(
+ s"Unsupported lineage dispatcher: $dispatcherType.")
+ }
+ }
}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
similarity index 67%
copy from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
copy to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
index 777e70121..d6afea152 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
@@ -15,17 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.kyuubi.lineage
+package org.apache.kyuubi.plugin.lineage
-import org.apache.spark.internal.config.ConfigBuilder
-
-object LineageConf {
-
- val SKIP_PARSING_PERMANENT_VIEW_ENABLED =
- ConfigBuilder("spark.kyuubi.plugin.lineage.skip.parsing.permanent.view.enabled")
- .doc("Whether to skip the lineage parsing of permanent views")
- .version("1.8.0")
- .booleanConf
- .createWithDefault(false)
+object LineageDispatcherType extends Enumeration {
+ type LineageDispatcherType = Value
+ val SPARK_EVENT, KYUUBI_EVENT = Value
}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
index 01deb0a44..b83117cde 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
@@ -17,24 +17,25 @@
package org.apache.kyuubi.plugin.lineage
-import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
-import org.apache.kyuubi.plugin.lineage.events.OperationLineageEvent
import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
class SparkOperationLineageQueryExecutionListener extends QueryExecutionListener {
+ private lazy val dispatchers: Seq[LineageDispatcher] = {
+ SparkContextHelper.getConf(LineageConf.DISPATCHERS).map(LineageDispatcher(_))
+ }
+
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val lineage =
SparkSQLLineageParseHelper(qe.sparkSession).transformToLineage(qe.id, qe.analyzed)
- val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), lineage, None)
- SparkContextHelper.postEventToSparkListenerBus(event)
+ dispatchers.foreach(_.send(qe, lineage))
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
- val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
- SparkContextHelper.postEventToSparkListenerBus(event)
+ dispatchers.foreach(_.onFailure(qe, exception))
}
}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiEventDispatcher.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiEventDispatcher.scala
new file mode 100644
index 000000000..6a9e65948
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiEventDispatcher.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher
+
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.events.{EventBus, KyuubiEvent}
+import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
+
+class KyuubiEventDispatcher extends LineageDispatcher {
+
+ override def send(qe: QueryExecution, lineage: Option[Lineage]): Unit = {
+ val event = OperationLineageKyuubiEvent(qe.id, System.currentTimeMillis(), lineage, None)
+ EventBus.post(event)
+ }
+
+ override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
+ val event =
+ OperationLineageKyuubiEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
+ EventBus.post(event)
+ }
+
+}
+
+case class OperationLineageKyuubiEvent(
+ executionId: Long,
+ eventTime: Long,
+ lineage: Option[Lineage],
+ exception: Option[Throwable]) extends KyuubiEvent {
+ override def partitions: Seq[(String, String)] =
+ ("day", Utils.getDateFromTimestamp(eventTime)) :: Nil
+}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/SparkEventDispatcher.scala
similarity index 56%
copy from extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
copy to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/SparkEventDispatcher.scala
index 01deb0a44..36fbbb7d4 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/SparkEventDispatcher.scala
@@ -15,26 +15,29 @@
* limitations under the License.
*/
-package org.apache.kyuubi.plugin.lineage
+package org.apache.kyuubi.plugin.lineage.dispatcher
import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-import org.apache.kyuubi.plugin.lineage.events.OperationLineageEvent
-import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
+import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
-class SparkOperationLineageQueryExecutionListener extends QueryExecutionListener {
+class SparkEventDispatcher extends LineageDispatcher {
- override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
- val lineage =
- SparkSQLLineageParseHelper(qe.sparkSession).transformToLineage(qe.id, qe.analyzed)
- val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), lineage, None)
+ override def send(qe: QueryExecution, lineage: Option[Lineage]): Unit = {
+ val event = OperationLineageSparkEvent(qe.id, System.currentTimeMillis(), lineage, None)
SparkContextHelper.postEventToSparkListenerBus(event)
}
- override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
- val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
+ override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
+ val event = OperationLineageSparkEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
SparkContextHelper.postEventToSparkListenerBus(event)
}
}
+
+case class OperationLineageSparkEvent(
+ executionId: Long,
+ eventTime: Long,
+ lineage: Option[Lineage],
+ exception: Option[Throwable]) extends SparkListenerEvent
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index dd085f404..793572611 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
-import org.apache.kyuubi.plugin.lineage.events.Lineage
+import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
trait LineageParser {
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
index 777e70121..6fb5399c0 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
@@ -19,6 +19,8 @@ package org.apache.spark.kyuubi.lineage
import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.kyuubi.plugin.lineage.LineageDispatcherType
+
object LineageConf {
val SKIP_PARSING_PERMANENT_VIEW_ENABLED =
@@ -28,4 +30,18 @@ object LineageConf {
.booleanConf
.createWithDefault(false)
+ val DISPATCHERS = ConfigBuilder("spark.kyuubi.plugin.lineage.dispatchers")
+ .doc("The lineage dispatchers are implementations of " +
+ "`org.apache.kyuubi.plugin.lineage.LineageDispatcher` for dispatching lineage events.<ul>" +
+ "<li>SPARK_EVENT: send lineage event to spark event bus</li>" +
+ "<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>" +
+ "</ul>")
+ .version("1.8.0")
+ .stringConf
+ .toSequence
+ .checkValue(
+ _.toSet.subsetOf(LineageDispatcherType.values.map(_.toString)),
+ "Unsupported lineage dispatchers")
+ .createWithDefault(Seq(LineageDispatcherType.SPARK_EVENT.toString))
+
}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
index 1057bd74e..67e94ad0b 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
@@ -22,11 +22,14 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.immutable.List
import org.apache.spark.SparkConf
-import org.apache.spark.kyuubi.lineage.LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED
+import org.apache.spark.kyuubi.lineage.LineageConf._
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.SparkListenerExtensionTest
import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.events.EventBus
+import org.apache.kyuubi.plugin.lineage.Lineage
+import org.apache.kyuubi.plugin.lineage.dispatcher.{OperationLineageKyuubiEvent, OperationLineageSparkEvent}
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
@@ -43,19 +46,21 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
.set(
"spark.sql.queryExecutionListeners",
"org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener")
+ .set(DISPATCHERS.key, "SPARK_EVENT,KYUUBI_EVENT")
.set(SKIP_PARSING_PERMANENT_VIEW_ENABLED.key, "true")
}
test("operation lineage event capture: for execute sql") {
- val countDownLatch = new CountDownLatch(1)
- var actual: Lineage = null
+ val countDownLatch = new CountDownLatch(2)
+ // get lineage from spark event
+ var actualSparkEventLineage: Lineage = null
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case lineageEvent: OperationLineageEvent =>
+ case lineageEvent: OperationLineageSparkEvent =>
lineageEvent.lineage.foreach {
case lineage if lineage.inputTables.nonEmpty =>
- actual = lineage
+ actualSparkEventLineage = lineage
countDownLatch.countDown()
}
case _ =>
@@ -63,6 +68,16 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
}
})
+ // get lineage from kyuubi event
+ var actualKyuubiEventLineage: Lineage = null
+ EventBus.register[OperationLineageKyuubiEvent] { lineageEvent: OperationLineageKyuubiEvent =>
+ lineageEvent.lineage.foreach {
+ case lineage if lineage.inputTables.nonEmpty =>
+ actualKyuubiEventLineage = lineage
+ countDownLatch.countDown()
+ }
+ }
+
withTable("test_table0") { _ =>
spark.sql("create table test_table0(a string, b string)")
spark.sql("select a as col0, b as col1 from test_table0").collect()
@@ -73,7 +88,8 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
("col0", Set("default.test_table0.a")),
("col1", Set("default.test_table0.b"))))
countDownLatch.await(20, TimeUnit.SECONDS)
- assert(actual == expected)
+ assert(actualSparkEventLineage == expected)
+ assert(actualKyuubiEventLineage == expected)
}
}
@@ -90,7 +106,8 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case lineageEvent: OperationLineageEvent if executionId == lineageEvent.executionId =>
+ case lineageEvent: OperationLineageSparkEvent
+ if executionId == lineageEvent.executionId =>
lineageEvent.lineage.foreach { lineage =>
assert(lineage == expected)
countDownLatch.countDown()
@@ -126,7 +143,7 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case lineageEvent: OperationLineageEvent =>
+ case lineageEvent: OperationLineageSparkEvent =>
lineageEvent.lineage.foreach {
case lineage if lineage.inputTables.nonEmpty && lineage.outputTables.isEmpty =>
actual = lineage
diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 0c69885da..c25c94d39 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, SchemaRel
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.plugin.lineage.events.Lineage
+import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite