You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/22 08:07:06 UTC
[carbondata] branch master updated: [CARBONDATA-3247] Support to
select all columns when creating MV datamap
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 675b5bb [CARBONDATA-3247] Support to select all columns when creating MV datamap
675b5bb is described below
commit 675b5bb215f52775d6a3288032dbfb641140d8ca
Author: qiuchenjian <80...@qq.com>
AuthorDate: Fri Jun 14 22:49:25 2019 +0800
[CARBONDATA-3247] Support to select all columns when creating MV datamap
[Cause]
ColumnPruning rule will not generate Project LogicalPlan when select all
columns, So carbon can't generate SELECT node when transforming LogicalPlan
to ModularPlan, then it can't create mv datamap
[Solution]
Add a executor rule to change the logical plan to support this scene
This closes #3072
---
.../apache/carbondata/mv/datamap/MVHelper.scala | 3 +-
.../apache/carbondata/mv/rewrite/Navigator.scala | 1 +
.../mv/rewrite/SelectAllColumnsSuite.scala | 50 ++++++++++++++++++++++
.../carbondata/mv/plans/modular/ModularPlan.scala | 2 +
.../carbondata/mv/plans/util/SQLBuildDSL.scala | 26 +++++++++++
5 files changed, 81 insertions(+), 1 deletion(-)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index c0831ae..22ec9e3 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
import org.apache.carbondata.datamap.DataMapManager
-import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select}
+import org.apache.carbondata.mv.plans.modular._
import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog}
import org.apache.carbondata.spark.util.CommonUtil
@@ -302,6 +302,7 @@ object MVHelper {
g.child match {
case s: Select =>
isValidSelect(isValidExp, s)
+ case m: ModularRelation => isValidExp
}
case s: Select =>
isValidSelect(true, s)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index b2f1039..b6cbc24 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -200,6 +200,7 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
return intersetJoinEdges.exists(j => j.left == rIndex && j.left == eIndex ||
j.right == rIndex && j.right == eIndex)
}
+ case _ => false
}
}
true
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
new file mode 100644
index 0000000..75e54b2
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+
+class SelectAllColumnsSuite extends QueryTest {
+
+ test ("table select all columns mv") {
+ sql("drop datamap if exists all_table_mv")
+ sql("drop table if exists all_table")
+ sql("create table all_table(name string, age int, height int) stored by 'carbondata'")
+ sql("insert into all_table select 'tom',20,175")
+ sql("insert into all_table select 'tom',32,180")
+ sql("create datamap all_table_mv on table all_table using 'mv' as select avg(age),avg(height),name from all_table group by name")
+ sql("rebuild datamap all_table_mv")
+ checkAnswer(
+ sql("select avg(age),avg(height),name from all_table group by name"),
+ Seq(Row(26.0, 177.5, "tom")))
+ val frame = sql("select avg(age),avg(height),name from all_table group by name")
+ val analyzed = frame.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "all_table_mv"))
+ sql("drop table if exists all_table")
+ }
+
+ def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+ val tables = logicalPlan collect {
+ case l: LogicalRelation => l.catalogTable.get
+ }
+ tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+ }
+
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index cdf0aa4..55d3c5c 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -125,6 +125,8 @@ abstract class ModularPlan
case modular.Select(_, _, _, _, _, children, _, _, _, _)
if children.forall(_.isInstanceOf[modular.LeafNode]) => true
+ case modular.GroupBy(_, _, _, _, modular.ModularRelation(_, _, _, _, _), _, _, _) => true
+
case _ => false
}
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
index 99161e5..7a60169 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
@@ -308,6 +308,32 @@ trait SQLBuildDSL {
operator: ModularPlan,
alias: Option[String]): Fragment = {
operator match {
+ case g@modular.GroupBy(_, _, _, _, s@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _) =>
+ val fragmentList = s.children.zipWithIndex
+ .map { case (child, index) => fragmentExtract(child, s.aliasMap.get(index)) }
+ val fList = s.joinEdges.map {
+ e => {
+ (e.right, (fragmentList(e.right), Some(e.joinType), s
+ .extractRightEvaluableConditions(s.children(e.left), s.children(e.right))))
+ }
+ }.toMap
+ val from = (0 to fragmentList.length - 1)
+ .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+ val excludesPredicate = from.flatMap(_._3).toSet
+ val windowExprs = s.windowSpec
+ .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+ .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+ val select = s.outputList ++ windowExprs
+
+ SPJGFragment(
+ select, // select
+ from, // from
+ s.predicateList.filter { p => !excludesPredicate(p) }, // where
+ (Nil, Nil), // group by
+ Nil, // having
+ alias,
+ (s.flags, s.flagSpec))
+
case s@modular.Select(_, _, _, _, _, _, _, _, _, _) =>
val fragmentList = s.children.zipWithIndex
.map { case (child, index) => fragmentExtract(child, s.aliasMap.get(index)) }