You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/10/18 12:25:22 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1085] Fix-Enforce maxOutputRows for Resolve View, Multi Insert and Distribute By

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f4094d6  [KYUUBI #1085] Fix-Enforce maxOutputRows for Resolve View, Multi Insert and Distribute By
f4094d6 is described below

commit f4094d652492cb20024a26465d6aaa17cc531098
Author: senmiaoliu <se...@trip.com>
AuthorDate: Mon Oct 18 20:25:10 2021 +0800

    [KYUUBI #1085] Fix-Enforce maxOutputRows for Resolve View, Multi Insert and Distribute By
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
    
        1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
        2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
        3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
           -->
    
    ### _Why are the changes needed?_
    
    1. Should not add limit when multi insert tables.
    
    ```sql
    FROM tmp_table
    insert into tmp_table1 select * limit 2
    insert into tmp_table2 select *
    ```
    
    We should not add any limit when insert table
    
    2. Should add limit when distribute by
    
    ```sql
    select * from tmp_table distribute by a
    ```
    
    3. Should not add limit when resolve view
    
    ```sql
    SELECT * FROM
    (select * from tmp_view1 union select * from tmp_view2)
    ORDER BY a DESC
    ```
    ![image](https://user-images.githubusercontent.com/18713676/137686024-ae3fe69a-3b9a-452f-ba4e-3f6022ebebae.png)
    
    Like this,  when the limit is added in resolve view, the complete data cannot be obtained, so the final result is incorrect
    
    ### _How was this patch tested?_
    
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    ![image](https://user-images.githubusercontent.com/18713676/137688152-07e13216-4bc2-4bd8-84c5-69bc9a31474d.png)
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1247 from lsm1/fix/add_limit.
    
    Closes #1085
    
    a02a99c0 [senmiaoliu] Fix-Enforce maxOutputRows for Resolve View, Multi Insert and Distribute By
    
    Authored-by: senmiaoliu <se...@trip.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../sql/watchdog/ForcedMaxOutputRowsRule.scala     | 20 ++++-
 .../scala/org/apache/spark/sql/WatchDogSuite.scala | 88 ++++++++++++++++++++++
 2 files changed, 105 insertions(+), 3 deletions(-)

diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
index d82eead..e52a4c6 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
@@ -18,11 +18,13 @@
 package org.apache.kyuubi.sql.watchdog
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.AnalysisContext
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, Filter, Limit, LogicalPlan, Project, Sort, Union}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, Filter, Limit, LogicalPlan, Project, RepartitionByExpression, Sort, Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.command.DataWritingCommand
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
 
@@ -56,16 +58,27 @@ case class ForcedMaxOutputRowsRule(session: SparkSession) extends Rule[LogicalPl
     .aggregateExpressions.exists(p => p.getTagValue(ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE)
     .contains(ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE_FLAG))
 
+  private def isView: Boolean = {
+    val nestedViewDepth = AnalysisContext.get.nestedViewDepth
+    nestedViewDepth > 0
+  }
+
   private def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
 
     case Aggregate(_, Alias(_, "havingCondition")::Nil, _) => false
     case agg: Aggregate => !isChildAggregate(agg)
+    case _: RepartitionByExpression => true
     case _: Distinct => true
     case _: Filter => true
     case _: Project => true
     case Limit(_, _) => true
     case _: Sort => true
-    case _: Union => true
+    case Union(children, _, _) =>
+      if (children.exists(_.isInstanceOf[DataWritingCommand])) {
+        false
+      } else {
+        true
+      }
     case _ => false
 
   }
@@ -74,7 +87,8 @@ case class ForcedMaxOutputRowsRule(session: SparkSession) extends Rule[LogicalPl
 
     maxOutputRowsOpt match {
       case Some(forcedMaxOutputRows) => canInsertLimitInner(p) &&
-        !p.maxRows.exists(_ <= forcedMaxOutputRows)
+        !p.maxRows.exists(_ <= forcedMaxOutputRows) &&
+        !isView
       case None => false
     }
   }
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
index 29ddea6..51aae6c 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -291,4 +291,92 @@ class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  test("test watchdog: Select View Statement for forceMaxOutputRows") {
+    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
+      withTable("tmp_table", "tmp_union") {
+        withView("tmp_view", "tmp_view2") {
+          sql(s"create table tmp_table (a int, b int)")
+          sql(s"insert into tmp_table values (1,10),(2,20),(3,30),(4,40),(5,50)")
+          sql(s"create table tmp_union (a int, b int)")
+          sql(s"insert into tmp_union values (6,60),(7,70),(8,80),(9,90),(10,100)")
+          sql(s"create view tmp_view2 as select * from tmp_union")
+          assert(!sql(
+            s"""
+               |CREATE VIEW tmp_view
+               |as
+               |SELECT * FROM
+               |tmp_table
+               |""".stripMargin)
+            .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+          assert(sql(
+            s"""
+               |SELECT * FROM
+               |tmp_view
+               |""".stripMargin)
+            .queryExecution.analyzed.maxRows.contains(3))
+
+          assert(sql(
+            s"""
+               |SELECT * FROM
+               |tmp_view
+               |limit 11
+               |""".stripMargin)
+            .queryExecution.analyzed.maxRows.contains(3))
+
+          assert(sql(
+            s"""
+               |SELECT * FROM
+               |(select * from tmp_view
+               |UNION
+               |select * from tmp_view2)
+               |ORDER BY a
+               |DESC
+               |""".stripMargin)
+            .collect().head.get(0).equals(10))
+        }
+      }
+    }
+  }
+
+  test("test watchdog: Insert Statement for forceMaxOutputRows") {
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
+      withTable("tmp_table", "tmp_insert") {
+        spark.sql(s"create table tmp_table (a int, b int)")
+        spark.sql(s"insert into tmp_table values (1,10),(2,20),(3,30),(4,40),(5,50)")
+        val multiInsertTableName1: String = "tmp_tbl1"
+        val multiInsertTableName2: String = "tmp_tbl2"
+        sql(s"drop table if exists $multiInsertTableName1")
+        sql(s"drop table if exists $multiInsertTableName2")
+        sql(s"create table $multiInsertTableName1 like tmp_table")
+        sql(s"create table $multiInsertTableName2 like tmp_table")
+        assert(!sql(
+          s"""
+             |FROM tmp_table
+             |insert into $multiInsertTableName1 select * limit 2
+             |insert into $multiInsertTableName2 select *
+             |""".stripMargin)
+          .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+      }
+    }
+  }
+
+  test("test watchdog: Distribute by for forceMaxOutputRows") {
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
+      withTable("tmp_table") {
+        spark.sql(s"create table tmp_table (a int, b int)")
+        spark.sql(s"insert into tmp_table values (1,10),(2,20),(3,30),(4,40),(5,50)")
+        assert(sql(
+          s"""
+             |SELECT *
+             |FROM tmp_table
+             |DISTRIBUTE BY a
+             |""".stripMargin)
+          .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+      }
+    }
+  }
 }