You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/09/25 13:56:47 UTC

[carbondata] branch master updated: [CARBONDATA-4009] Fix PartialQueries not hitting mv

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b933b  [CARBONDATA-4009] Fix PartialQueries not hitting mv
44b933b is described below

commit 44b933bf93c69afe0c5f3a8de55a47934d6ba948
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Thu Sep 24 22:15:37 2020 +0530

    [CARBONDATA-4009] Fix PartialQueries not hitting mv
    
    Why is this PR needed?
    If a MV having aggregation, is created with all columns present in main table,
    then some partial-queries are not hitting mv.
    
    What changes were proposed in this PR?
    When all columns are given in projection in a aggregate query, logical plan
    will not have Project node. Added Project node to logical plan in this case,
    to match partialqueries also to mv. In GroupByGroupByNochildData pattern,
    added check for checking CAST(aggregate expression) and used group by result
    to get final compensation result
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3955
---
 .../org/apache/spark/sql/optimizer/MVMatcher.scala | 21 ++++++++----
 .../carbondata/view/rewrite/MVCreateTestCase.scala | 38 ++++++++++++++++++++++
 .../carbondata/mv/plans/modular/Modularizer.scala  | 24 ++++++++++++--
 3 files changed, 74 insertions(+), 9 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
index 7f57bee..d562b9d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
@@ -873,8 +873,15 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
           gb_2q.predicateList.exists(_.semanticEquals(expr)) ||
           isExpressionMatches(expr, gb_2q.predicateList))
         val isOutputEmR = gb_2q.outputList.forall {
+          case Alias(cast: Cast, _) =>
+            gb_2a.outputList.exists {
+              case Alias(castExp: Cast, _) => castExp.child.semanticEquals(cast.child)
+              case alias: Alias => alias.child.semanticEquals(cast.child)
+              case exp => exp.semanticEquals(cast.child)
+            }
           case a @ Alias(_, _) =>
             gb_2a.outputList.exists {
+              case Alias(cast: Cast, _) => cast.child.semanticEquals(a.child)
               case alias: Alias => alias.child.semanticEquals(a.child)
               case exp => exp.semanticEquals(a.child)
             }
@@ -917,6 +924,7 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
           val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
             (a.toAttribute, a)})
           if (isGroupingEmR) {
+            val subsumerName = Seq(0 -> generator.newSubsumerName()).toMap
             tryMatch(
               gb_2a, gb_2q, aliasMap).flatMap {
               case g: GroupBy =>
@@ -932,16 +940,13 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
                   val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
                   val sel_1a = g.child.asInstanceOf[Select]
 
-                  val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
                   tChildren += gb_2a
                   val sel_1q_temp = sel_1a.copy(
                     predicateList = sel_1a.predicateList,
                     children = tChildren,
                     joinEdges = sel_1a.joinEdges,
-                    aliasMap = Seq(0 -> generator.newSubsumerName()).toMap)
-
-                  val res = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
-                  Some(g.copy(child = res))
+                    aliasMap = subsumerName)
+                  Some(g.copy(child = sel_1q_temp))
                 } else {
                   Some(g.copy(child = g.child.withNewChildren(
                     g.child.children.map {
@@ -949,7 +954,11 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
                       case other => other
                     })));
                 }
-              case _ => None}.map(Seq(_)).getOrElse(Nil)
+              case _ => None
+            }.map { wip =>
+              factorOutSubsumer(wip, gb_2a, subsumerName)
+            }.map(Seq(_))
+              .getOrElse(Nil)
           } else {
             Nil
           }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
index db39737..2dd601e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
@@ -104,6 +104,44 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
+  test("test if partial query with group by hits mv when all columns present in mv") {
+    sql("drop table if exists sales")
+    sql(" CREATE TABLE sales (id int, name string)  STORED AS carbondata")
+    sql("insert into sales values(1,'ab'),(2,'bc')")
+    val result1 = sql("SELECT id, name, sum(id)  FROM sales GROUP BY id, name")
+    val result2 = sql("SELECT name, sum(id)  FROM sales GROUP BY id, name")
+    val result3 = sql("SELECT name, sum(id)  FROM sales GROUP BY name")
+    sql("drop materialized view if exists agg_sale")
+    sql("CREATE MATERIALIZED VIEW agg_sale AS SELECT id, name, sum(id)  FROM sales GROUP BY id, name")
+    val df1 = sql("SELECT id, name, sum(id)  FROM sales GROUP BY id, name")
+    val df2 = sql("SELECT name, sum(id)  FROM sales GROUP BY id, name")
+    val df3 = sql("SELECT name, sum(id)  FROM sales GROUP BY name")
+    TestUtil.verifyMVHit(df1.queryExecution.optimizedPlan, "agg_sale")
+    TestUtil.verifyMVHit(df2.queryExecution.optimizedPlan, "agg_sale")
+    TestUtil.verifyMVHit(df3.queryExecution.optimizedPlan, "agg_sale")
+    checkAnswer(df1, result1)
+    checkAnswer(df2, result2)
+    checkAnswer(df3, result3)
+    sql("drop table if exists sales")
+  }
+
+  test("test if partial query with group by hits mv when some columns present in mv") {
+    sql("drop table if exists sales")
+    sql(" CREATE TABLE sales (id int, name string, sal int)  STORED AS carbondata")
+    sql("insert into sales values(1,'ab', 100),(2,'bc', 100)")
+    val result1 = sql("SELECT id, name, sum(id)  FROM sales GROUP BY id, name")
+    val result2 = sql("SELECT name, sum(id)  FROM sales GROUP BY id, name")
+    sql("drop materialized view if exists agg_sale")
+    sql("CREATE MATERIALIZED VIEW agg_sale AS SELECT id, name, sum(id)  FROM sales GROUP BY id, name")
+    val df1 = sql("SELECT id, name, sum(id)  FROM sales GROUP BY id, name")
+    val df2 = sql("SELECT name, sum(id)  FROM sales GROUP BY id, name")
+    TestUtil.verifyMVHit(df1.queryExecution.optimizedPlan, "agg_sale")
+    TestUtil.verifyMVHit(df2.queryExecution.optimizedPlan, "agg_sale")
+    checkAnswer(df1, result1)
+    checkAnswer(df2, result2)
+    sql("drop table if exists sales")
+  }
+
   test("test create mv on parquet spark table") {
     sql("drop materialized view if exists mv1")
     sql("drop table if exists source")
diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
index d255359..739fb82 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.mv.plans.modular
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, NamedExpression, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, SubqueryAlias}
 import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans._
@@ -37,7 +38,8 @@ abstract class Modularizer[TreeType <: TreeNode[TreeType]] {
   // protected def modularizeLater(plan: LogicalPlan) = this.modularize(plan).next()
 
   def modularize(plan: LogicalPlan): Iterator[TreeType] = {
-    val replaced = plan.transformAllExpressions {
+    val logicalPlan = checkAndAddProjectToGroupByInPlan(plan)
+    val replaced = logicalPlan.transformAllExpressions {
       case s: ScalarSubquery =>
         if (s.children.isEmpty) {
           ScalarModularSubquery(
@@ -66,6 +68,22 @@ abstract class Modularizer[TreeType <: TreeNode[TreeType]] {
     makeupAliasMappings(mplans)
   }
 
+  private def checkAndAddProjectToGroupByInPlan(plan: LogicalPlan) = {
+    // If plan contains Group by with only LogicalRelation, which means, all the columns of fact
+    // table is used in mv. In that case, add Project node to the plan, to match partial queries
+    plan.transform {
+      case Aggregate(groupBy, aggregations, lr: LogicalRelation) =>
+        Aggregate(groupBy, aggregations, Project(lr.output.asInstanceOf[Seq[NamedExpression]], lr))
+      case Aggregate(groupBy, aggregations, alias: SubqueryAlias) =>
+        if (alias.child.isInstanceOf[LogicalRelation]) {
+          Aggregate(groupBy,
+            aggregations,
+            Project(alias.child.output.asInstanceOf[Seq[NamedExpression]], alias.child))
+        }
+        Aggregate(groupBy, aggregations, alias)
+    }
+  }
+
   private def modularizeCore(plan: LogicalPlan): Iterator[TreeType] = {
     // Collect modular plan candidates.
     val candidates = patterns.iterator.flatMap(_ (plan))