You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/22 08:07:06 UTC

[carbondata] branch master updated: [CARBONDATA-3247] Support to select all columns when creating MV datamap

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 675b5bb  [CARBONDATA-3247] Support to select all columns when creating MV datamap
675b5bb is described below

commit 675b5bb215f52775d6a3288032dbfb641140d8ca
Author: qiuchenjian <80...@qq.com>
AuthorDate: Fri Jun 14 22:49:25 2019 +0800

    [CARBONDATA-3247] Support to select all columns when creating MV datamap
    
    [Cause]
    ColumnPruning rule will not generate Project LogicalPlan when select all
    columns, So carbon can't generate SELECT node when transforming LogicalPlan
    to ModularPlan, then it can't create mv datamap
    
    [Solution]
    Add a executor rule to change the logical plan to support this scene
    
    This closes #3072
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  3 +-
 .../apache/carbondata/mv/rewrite/Navigator.scala   |  1 +
 .../mv/rewrite/SelectAllColumnsSuite.scala         | 50 ++++++++++++++++++++++
 .../carbondata/mv/plans/modular/ModularPlan.scala  |  2 +
 .../carbondata/mv/plans/util/SQLBuildDSL.scala     | 26 +++++++++++
 5 files changed, 81 insertions(+), 1 deletion(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index c0831ae..22ec9e3 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.datamap.DataMapManager
-import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select}
+import org.apache.carbondata.mv.plans.modular._
 import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog}
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -302,6 +302,7 @@ object MVHelper {
         g.child match {
           case s: Select =>
             isValidSelect(isValidExp, s)
+          case m: ModularRelation => isValidExp
         }
       case s: Select =>
         isValidSelect(true, s)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index b2f1039..b6cbc24 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -200,6 +200,7 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
               return intersetJoinEdges.exists(j => j.left == rIndex && j.left == eIndex ||
                 j.right == rIndex && j.right == eIndex)
             }
+          case _ => false
         }
     }
     true
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
new file mode 100644
index 0000000..75e54b2
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+
+class SelectAllColumnsSuite extends QueryTest {
+
+  test ("table select all columns mv") {
+    sql("drop datamap if exists all_table_mv")
+    sql("drop table if exists all_table")
+    sql("create table all_table(name string, age int, height int)  stored by 'carbondata'")
+    sql("insert into all_table select 'tom',20,175")
+    sql("insert into all_table select 'tom',32,180")
+    sql("create datamap all_table_mv on table all_table using 'mv' as select avg(age),avg(height),name from all_table group by name")
+    sql("rebuild datamap all_table_mv")
+    checkAnswer(
+      sql("select avg(age),avg(height),name from all_table group by name"),
+      Seq(Row(26.0, 177.5, "tom")))
+    val frame = sql("select avg(age),avg(height),name from all_table group by name")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    sql("drop table if exists all_table")
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index cdf0aa4..55d3c5c 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -125,6 +125,8 @@ abstract class ModularPlan
       case modular.Select(_, _, _, _, _, children, _, _, _, _)
         if children.forall(_.isInstanceOf[modular.LeafNode]) => true
 
+      case modular.GroupBy(_, _, _, _, modular.ModularRelation(_, _, _, _, _), _, _, _) => true
+
       case _ => false
     }
   }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
index 99161e5..7a60169 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
@@ -308,6 +308,32 @@ trait SQLBuildDSL {
       operator: ModularPlan,
       alias: Option[String]): Fragment = {
     operator match {
+      case g@modular.GroupBy(_, _, _, _, s@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _) =>
+        val fragmentList = s.children.zipWithIndex
+          .map { case (child, index) => fragmentExtract(child, s.aliasMap.get(index)) }
+        val fList = s.joinEdges.map {
+          e => {
+            (e.right, (fragmentList(e.right), Some(e.joinType), s
+              .extractRightEvaluableConditions(s.children(e.left), s.children(e.right))))
+          }
+        }.toMap
+        val from = (0 to fragmentList.length - 1)
+          .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+        val excludesPredicate = from.flatMap(_._3).toSet
+        val windowExprs = s.windowSpec
+          .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+          .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+        val select = s.outputList ++ windowExprs
+
+        SPJGFragment(
+          select, // select
+          from, // from
+          s.predicateList.filter { p => !excludesPredicate(p) }, // where
+          (Nil, Nil), // group by
+          Nil, // having
+          alias,
+          (s.flags, s.flagSpec))
+
       case s@modular.Select(_, _, _, _, _, _, _, _, _, _) =>
         val fragmentList = s.children.zipWithIndex
           .map { case (child, index) => fragmentExtract(child, s.aliasMap.get(index)) }