You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@kyuubi.apache.org by "iodone (via GitHub)" <gi...@apache.org> on 2023/02/21 10:28:52 UTC

[GitHub] [kyuubi] iodone opened a new pull request, #4393: [Kyuubi #4332] Fix some bugs with `Groupby`

iodone opened a new pull request, #4393:
URL: https://github.com/apache/kyuubi/pull/4393

   <!--
   Thanks for sending a pull request!
   
   Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/CONTRIBUTING.html
     2. If the PR is related to an issue in https://github.com/apache/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
   -->
   close #4332 
   ### _Why are the changes needed?_
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you add a feature, you can talk about the use case of it.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   For the case where the table name has been resolved and an `Expand` logical plan exists
   ```
   InsertIntoHiveTable `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b]
   +- Aggregate [a#0], [a#0, ansi_cast((count(if ((gid#9 = 1)) spark_catalog.default.t2.`b`#10 else null) * count(if ((gid#9 = 2)) spark_catalog.default.t2.`c`#11 else null)) as string) AS b#8]
      +- Aggregate [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9], [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9]
         +- Expand [ArrayBuffer(a#0, b#1, null, 1), ArrayBuffer(a#0, null, c#2, 2)], [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9]
            +- HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#0, b#1, c#2], Partition Cols: []]
   ```
   
   ### _How was this patch tested?_
   - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
   
   - [ ] Add screenshots for manual tests if appropriate
   
   - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] iodone commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "iodone (via GitHub)" <gi...@apache.org>.
iodone commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1114148101


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -327,6 +336,19 @@ trait LineageParser {
           joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
         p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p: Expand =>
+        val childColumnsLineage = ListMap(p.output.collect {
+          case attr
+              if p.references.find(
+                _.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).nonEmpty =>

Review Comment:
   Like this` Aggregate [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9]`, name be resovled with namspace



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] codecov-commenter commented on pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#issuecomment-1441175783

   # [Codecov](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4393](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5542ce1) into [master](https://codecov.io/gh/apache/kyuubi/commit/171473ec718aeabe7d9e8e61fc63e2009e43cbd7?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (171473e) will **increase** coverage by `0.00%`.
   > The diff coverage is `60.00%`.
   
   > :exclamation: Current head 5542ce1 differs from pull request most recent head bf3e3d1. Consider uploading reports for the commit bf3e3d1 to get more accurate results
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #4393   +/-   ##
   =========================================
     Coverage     53.74%   53.74%           
     Complexity       13       13           
   =========================================
     Files           564      564           
     Lines         30921    30910   -11     
     Branches       4163     4169    +6     
   =========================================
   - Hits          16617    16613    -4     
   + Misses        12747    12743    -4     
   + Partials       1557     1554    -3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/lineage/helper/SparkSQLLineageParseHelper.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXh0ZW5zaW9ucy9zcGFyay9reXV1Ymktc3BhcmstbGluZWFnZS9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9wbHVnaW4vbGluZWFnZS9oZWxwZXIvU3BhcmtTUUxMaW5lYWdlUGFyc2VIZWxwZXIuc2NhbGE=) | `63.20% <60.00%> (-0.21%)` | :arrow_down: |
   | [...uubi/engine/spark/operation/ExecuteStatement.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9vcGVyYXRpb24vRXhlY3V0ZVN0YXRlbWVudC5zY2FsYQ==) | `76.92% <0.00%> (-3.76%)` | :arrow_down: |
   | [...ine/spark/operation/SparkSQLOperationManager.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9vcGVyYXRpb24vU3BhcmtTUUxPcGVyYXRpb25NYW5hZ2VyLnNjYWxh) | `82.71% <0.00%> (-2.55%)` | :arrow_down: |
   | [...kyuubi/engine/spark/operation/SparkOperation.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9vcGVyYXRpb24vU3BhcmtPcGVyYXRpb24uc2NhbGE=) | `76.47% <0.00%> (-0.36%)` | :arrow_down: |
   | [...in/scala/org/apache/kyuubi/config/KyuubiConf.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLWNvbW1vbi9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9jb25maWcvS3l1dWJpQ29uZi5zY2FsYQ==) | `97.52% <0.00%> (-0.01%)` | :arrow_down: |
   | [...org/apache/kyuubi/operation/ExecuteStatement.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9vcGVyYXRpb24vRXhlY3V0ZVN0YXRlbWVudC5zY2FsYQ==) | `79.26% <0.00%> (ø)` | |
   | [...g/apache/kyuubi/operation/BatchJobSubmission.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9vcGVyYXRpb24vQmF0Y2hKb2JTdWJtaXNzaW9uLnNjYWxh) | `75.27% <0.00%> (ø)` | |
   | [...kyuubi/engine/KubernetesApplicationOperation.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9lbmdpbmUvS3ViZXJuZXRlc0FwcGxpY2F0aW9uT3BlcmF0aW9uLnNjYWxh) | `22.53% <0.00%> (ø)` | |
   | [...org/apache/kyuubi/engine/spark/schema/RowSet.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9zY2hlbWEvUm93U2V0LnNjYWxh) | `98.52% <0.00%> (ø)` | |
   | [...ache/kyuubi/server/api/v1/OperationsResource.scala](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9zZXJ2ZXIvYXBpL3YxL09wZXJhdGlvbnNSZXNvdXJjZS5zY2FsYQ==) | `72.44% <0.00%> (+0.17%)` | :arrow_up: |
   | ... and [8 more](https://codecov.io/gh/apache/kyuubi/pull/4393?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#issuecomment-1442958670

   thanks, merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you closed pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you closed pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`
URL: https://github.com/apache/kyuubi/pull/4393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] iodone commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "iodone (via GitHub)" <gi...@apache.org>.
iodone commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115279554


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -327,6 +336,19 @@ trait LineageParser {
           joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
         p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p: Expand =>
+        val childColumnsLineage = ListMap(p.output.collect {
+          case attr
+              if p.references.find(
+                _.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).nonEmpty =>

Review Comment:
   There was a problem with the way the association was based on the table name, remove this logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115269948


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -327,6 +336,19 @@ trait LineageParser {
           joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
         p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p: Expand =>
+        val childColumnsLineage = ListMap(p.output.collect {
+          case attr
+              if p.references.find(
+                _.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).nonEmpty =>

Review Comment:
   I see. It should be a kind of bug of Spark... it's not a pretty name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115537045


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -160,15 +162,22 @@ trait LineageParser {
     }
   }
 
+  private def isNameWithQualifier(attr: Attribute, qualifier: Seq[String]): Boolean = {
+    val nameTokens = attr.name.split('.')
+    val namespace = nameTokens.init.mkString(".")
+    nameTokens.length > 1 && namespace.endsWith(qualifier.mkString("."))
+  }
+
   private def mergeRelationColumnLineage(
       parentColumnsLineage: AttributeMap[AttributeSet],
       relationOutput: Seq[Attribute],
       relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
     val mergedRelationColumnLineage = {
-      relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) {
-        case ((acc, x), attr) =>
-          (acc + (attr -> x.head._2), x.tail)
-      }._1
+      relationOutput.slice(0, relationColumnLineage.size)

Review Comment:
   Can we add the Window pattern here to only parse the lineage of child output and add some comments to explain we now do not support window function ? then I think we do not need this `slice` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115522050


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -160,15 +162,22 @@ trait LineageParser {
     }
   }
 
+  private def isNameWithQualifier(attr: Attribute, qualifier: Seq[String]): Boolean = {
+    val nameTokens = attr.name.split('.')
+    val namespace = nameTokens.init.mkString(".")
+    nameTokens.length > 1 && namespace.endsWith(qualifier.mkString("."))
+  }
+
   private def mergeRelationColumnLineage(
       parentColumnsLineage: AttributeMap[AttributeSet],
       relationOutput: Seq[Attribute],
       relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
     val mergedRelationColumnLineage = {
-      relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) {
-        case ((acc, x), attr) =>
-          (acc + (attr -> x.head._2), x.tail)
-      }._1
+      relationOutput.slice(0, relationColumnLineage.size)

Review Comment:
   > subquery's lineage of CacheTable is not include #rank column
   
   why does the subquery's lineage of CacheTable is not include `#rank` column ? should it return `rank -> [a, b]` ? or do you mean we do not support collect lineage from window ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] iodone commented on pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby`

Posted by "iodone (via GitHub)" <gi...@apache.org>.
iodone commented on PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#issuecomment-1438486908

   cc @ulysses-you 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] iodone commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "iodone (via GitHub)" <gi...@apache.org>.
iodone commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115207350


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -327,6 +336,19 @@ trait LineageParser {
           joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
         p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p: Expand =>
+        val childColumnsLineage = ListMap(p.output.collect {
+          case attr
+              if p.references.find(
+                _.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).nonEmpty =>
+            val ref = p.references
+              .find(_.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).get
+            (attr, ref.references)

Review Comment:
   yes, i fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115296950


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -160,15 +162,22 @@ trait LineageParser {
     }
   }
 
+  private def isNameWithQualifier(attr: Attribute, qualifier: Seq[String]): Boolean = {
+    val nameTokens = attr.name.split('.')
+    val namespace = nameTokens.init.mkString(".")
+    nameTokens.length > 1 && namespace.endsWith(qualifier.mkString("."))
+  }
+
   private def mergeRelationColumnLineage(
       parentColumnsLineage: AttributeMap[AttributeSet],
       relationOutput: Seq[Attribute],
       relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
     val mergedRelationColumnLineage = {
-      relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) {
-        case ((acc, x), attr) =>
-          (acc + (attr -> x.head._2), x.tail)
-      }._1
+      relationOutput.slice(0, relationColumnLineage.size)

Review Comment:
   should the length of `relationColumnLineage` always same with `relationOutput` ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] iodone commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "iodone (via GitHub)" <gi...@apache.org>.
iodone commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115534558


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -160,15 +162,22 @@ trait LineageParser {
     }
   }
 
+  private def isNameWithQualifier(attr: Attribute, qualifier: Seq[String]): Boolean = {
+    val nameTokens = attr.name.split('.')
+    val namespace = nameTokens.init.mkString(".")
+    nameTokens.length > 1 && namespace.endsWith(qualifier.mkString("."))
+  }
+
   private def mergeRelationColumnLineage(
       parentColumnsLineage: AttributeMap[AttributeSet],
       relationOutput: Seq[Attribute],
       relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
     val mergedRelationColumnLineage = {
-      relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) {
-        case ((acc, x), attr) =>
-          (acc + (attr -> x.head._2), x.tail)
-      }._1
+      relationOutput.slice(0, relationColumnLineage.size)

Review Comment:
   Yes, we do not currently have` window`-specific parsing.  I'm not sure if `#rank` needs to be mapped to `[a, b]` here either



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115292467


##########
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala:
##########
@@ -1094,6 +1094,92 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
     }
   }
 
+  test("test group by") {
+    withTable("t1", "t2", "v2_catalog.db.t1", "v2_catalog.db.t2") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string, c string) USING hive")
+      spark.sql("CREATE TABLE v2_catalog.db.t1 (a string, b string, c string)")
+      spark.sql("CREATE TABLE v2_catalog.db.t2 (a string, b string, c string)")
+      val ret0 =
+        exectractLineage(
+          s"insert into table t1 select a," +
+            s"concat_ws('/', collect_set(b))," +
+            s"count(distinct(b)) * count(distinct(c))" +
+            s"from t2 group by a")
+      assert(ret0 == Lineage(
+        List("default.t2"),
+        List("default.t1"),
+        List(
+          ("default.t1.a", Set("default.t2.a")),
+          ("default.t1.b", Set("default.t2.b")),
+          ("default.t1.c", Set("default.t2.b", "default.t2.c")))))
+
+      val ret1 =
+        exectractLineage(
+          s"insert into table v2_catalog.db.t1 select a," +
+            s"concat_ws('/', collect_set(b))," +
+            s"count(distinct(b)) * count(distinct(c))" +
+            s"from v2_catalog.db.t2 group by a")
+      assert(ret1 == Lineage(
+        List("v2_catalog.db.t2"),
+        List("v2_catalog.db.t1"),
+        List(
+          ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+          ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b")),
+          ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b", "v2_catalog.db.t2.c")))))
+
+      val ret2 =
+        exectractLineage(
+          s"insert into table v2_catalog.db.t1 select a," +
+            s"count(distinct(b+c))," +
+            s"count(distinct(b)) * count(distinct(c))" +
+            s"from v2_catalog.db.t2 group by a")
+      assert(ret2 == Lineage(
+        List("v2_catalog.db.t2"),
+        List("v2_catalog.db.t1"),
+        List(
+          ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+          ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b", "v2_catalog.db.t2.c")),
+          ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b", "v2_catalog.db.t2.c")))))
+    }
+  }
+
+  test("test grouping sets") {
+    withTable("t1", "t2") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string, c string, d string) USING hive")
+      val ret0 =
+        exectractLineage(
+          s"insert into table t1 select a,b,GROUPING__ID " +
+            s"from t2 group by a,b,c,d grouping sets ((a,b,c), (a,b,d))")
+      assert(ret0 == Lineage(
+        List("default.t2"),
+        List("default.t1"),
+        List(
+          ("default.t1.a", Set("default.t2.a")),
+          ("default.t1.b", Set("default.t2.b")),
+          ("default.t1.c", Set()))))
+    }
+  }
+
+  test("test catch table with window function") {
+    withTable("t1", "t2") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string) USING hive")
+      spark.sql(
+        s"cache table c1 select * from (" +
+          s"select a, b, row_number() over (partition by a order by b asc ) rank from t2)" +
+          s" where rank=1")
+      val ret0 = exectractLineage("insert overwrite table t1 select a, b from c1")

Review Comment:
   this is the test case for code `relationOutput.slice(0, relationColumnLineage.size)`, right ?
   does it work with `insert overwrite table t1 select a, rank from c1` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1113881042


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -327,6 +336,19 @@ trait LineageParser {
           joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
         p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p: Expand =>
+        val childColumnsLineage = ListMap(p.output.collect {
+          case attr
+              if p.references.find(
+                _.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).nonEmpty =>
+            val ref = p.references
+              .find(_.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).get
+            (attr, ref.references)

Review Comment:
   what happens with `select count(distinct c1 + c2) as x, count(distinct(c3)) as y` ?
   do we return x -> [c1, c2] and y -> [c3] ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby`

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1113882002


##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -327,6 +336,19 @@ trait LineageParser {
           joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
         p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p: Expand =>
+        val childColumnsLineage = ListMap(p.output.collect {
+          case attr
+              if p.references.find(
+                _.name == attr.name.split('.').last.stripPrefix("`").stripSuffix("`")).nonEmpty =>

Review Comment:
   What's the problem with `p.references.find(_.name == attr.name)` ? the name should have no qualifier



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [kyuubi] iodone commented on a diff in pull request #4393: [Kyuubi #4332] Fix some bugs with `Groupby` and `CacheTable`

Posted by "iodone (via GitHub)" <gi...@apache.org>.
iodone commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115517270


##########
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala:
##########
@@ -1094,6 +1094,92 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
     }
   }
 
+  test("test group by") {
+    withTable("t1", "t2", "v2_catalog.db.t1", "v2_catalog.db.t2") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string, c string) USING hive")
+      spark.sql("CREATE TABLE v2_catalog.db.t1 (a string, b string, c string)")
+      spark.sql("CREATE TABLE v2_catalog.db.t2 (a string, b string, c string)")
+      val ret0 =
+        exectractLineage(
+          s"insert into table t1 select a," +
+            s"concat_ws('/', collect_set(b))," +
+            s"count(distinct(b)) * count(distinct(c))" +
+            s"from t2 group by a")
+      assert(ret0 == Lineage(
+        List("default.t2"),
+        List("default.t1"),
+        List(
+          ("default.t1.a", Set("default.t2.a")),
+          ("default.t1.b", Set("default.t2.b")),
+          ("default.t1.c", Set("default.t2.b", "default.t2.c")))))
+
+      val ret1 =
+        exectractLineage(
+          s"insert into table v2_catalog.db.t1 select a," +
+            s"concat_ws('/', collect_set(b))," +
+            s"count(distinct(b)) * count(distinct(c))" +
+            s"from v2_catalog.db.t2 group by a")
+      assert(ret1 == Lineage(
+        List("v2_catalog.db.t2"),
+        List("v2_catalog.db.t1"),
+        List(
+          ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+          ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b")),
+          ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b", "v2_catalog.db.t2.c")))))
+
+      val ret2 =
+        exectractLineage(
+          s"insert into table v2_catalog.db.t1 select a," +
+            s"count(distinct(b+c))," +
+            s"count(distinct(b)) * count(distinct(c))" +
+            s"from v2_catalog.db.t2 group by a")
+      assert(ret2 == Lineage(
+        List("v2_catalog.db.t2"),
+        List("v2_catalog.db.t1"),
+        List(
+          ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+          ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b", "v2_catalog.db.t2.c")),
+          ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b", "v2_catalog.db.t2.c")))))
+    }
+  }
+
+  test("test grouping sets") {
+    withTable("t1", "t2") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string, c string, d string) USING hive")
+      val ret0 =
+        exectractLineage(
+          s"insert into table t1 select a,b,GROUPING__ID " +
+            s"from t2 group by a,b,c,d grouping sets ((a,b,c), (a,b,d))")
+      assert(ret0 == Lineage(
+        List("default.t2"),
+        List("default.t1"),
+        List(
+          ("default.t1.a", Set("default.t2.a")),
+          ("default.t1.b", Set("default.t2.b")),
+          ("default.t1.c", Set()))))
+    }
+  }
+
+  test("test catch table with window function") {
+    withTable("t1", "t2") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string) USING hive")
+      spark.sql(
+        s"cache table c1 select * from (" +
+          s"select a, b, row_number() over (partition by a order by b asc ) rank from t2)" +
+          s" where rank=1")
+      val ret0 = exectractLineage("insert overwrite table t1 select a, b from c1")

Review Comment:
   yes, it does work



##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -160,15 +162,22 @@ trait LineageParser {
     }
   }
 
+  private def isNameWithQualifier(attr: Attribute, qualifier: Seq[String]): Boolean = {
+    val nameTokens = attr.name.split('.')
+    val namespace = nameTokens.init.mkString(".")
+    nameTokens.length > 1 && namespace.endsWith(qualifier.mkString("."))
+  }
+
   private def mergeRelationColumnLineage(
       parentColumnsLineage: AttributeMap[AttributeSet],
       relationOutput: Seq[Attribute],
       relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
     val mergedRelationColumnLineage = {
-      relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) {
-        case ((acc, x), attr) =>
-          (acc + (attr -> x.head._2), x.tail)
-      }._1
+      relationOutput.slice(0, relationColumnLineage.size)

Review Comment:
   this case:
   ```
   cache table c2 select * from (
        select b, a, row_number() over (partition by a order by b asc ) rank from t2)
         where rank=1
   ```
   the logicalPlan of `CacheTable`'s subquery is 
   ```
   Filter (isnotnull(rank#4) AND (rank#4 = 1))
   +- Window [row_number() windowspecdefinition(a#9, b#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#4], [a#9], [b#10 ASC NULLS FIRST]
      +- Project [b#10, a#9]
         +- HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#9, b#10], Partition Cols: []]
   ```
   the output of `CacheTable` is [#a, #b, #rank], but the subquery's lineage of `CacheTable` is not include `#rank` column



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org