You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2019/12/29 10:42:09 UTC
[carbondata] branch master updated: [CARBONDATA-3629] Fix Select
query failure on aggregation of same column on MV
This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 2b222d4 [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV
2b222d4 is described below
commit 2b222d4d279e7f0c53c062b0740d4aed3904a960
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Tue Dec 24 13:15:02 2019 +0530
[CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV
Problem:
If MV datamap is created with SELECT a,sum(a) from maintable group by a and later queried with SELECT sum(a) from maintable, query fails as rewritten plan output list doesn't match with the table.
Solution:
Check if Aggregation exists in GroupBy Node and copy select node with aliasMap
This closes #3530
---
.../org/apache/carbondata/mv/datamap/MVUtil.scala | 39 ++++++++++++++---
.../carbondata/mv/rewrite/DefaultMatchMaker.scala | 51 ++++++++++++++++------
.../mv/rewrite/TestAllOperationsOnMV.scala | 29 ++++++++++++
.../TestMVTimeSeriesCreateDataMapCommand.scala | 13 +++++-
docs/datamap/mv-datamap-guide.md | 11 ++++-
.../command/timeseries/TimeSeriesUtil.scala | 4 +-
6 files changed, 122 insertions(+), 25 deletions(-)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index fe76cc3..f3e8091 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -49,23 +49,37 @@ class MVUtil {
case select: Select =>
select.children.map {
case groupBy: GroupBy =>
- getFieldsFromProject(groupBy.outputList, groupBy.predicateList, logicalRelation)
+ getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
+ logicalRelation, groupBy.flagSpec)
case _: ModularRelation =>
- getFieldsFromProject(select.outputList, select.predicateList, logicalRelation)
+ getFieldsFromProject(select.outputList, select.predicateList,
+ logicalRelation, select.flagSpec)
}.head
case groupBy: GroupBy =>
groupBy.child match {
case select: Select =>
- getFieldsFromProject(groupBy.outputList, select.predicateList, logicalRelation)
+ getFieldsFromProject(groupBy.outputList, select.predicateList,
+ logicalRelation, select.flagSpec)
case _: ModularRelation =>
- getFieldsFromProject(groupBy.outputList, groupBy.predicateList, logicalRelation)
+ getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
+ logicalRelation, groupBy.flagSpec)
}
}
}
+ /**
+ * Create's main table to datamap table field relation map by using modular plan generated from
+ * user query
+ * @param outputList of the modular plan
+ * @param predicateList of the modular plan
+ * @param logicalRelation list of main table from query
+ * @param flagSpec to get SortOrder attribute if exists
+ * @return fieldRelationMap
+ */
def getFieldsFromProject(outputList: Seq[NamedExpression],
predicateList: Seq[Expression],
- logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field, DataMapField] = {
+ logicalRelation: Seq[LogicalRelation],
+ flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, DataMapField] = {
var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, logicalRelation)
var finalPredicateList: Seq[NamedExpression] = Seq.empty
@@ -75,6 +89,21 @@ class MVUtil {
finalPredicateList = finalPredicateList.:+(attr)
}
}
+ // collect sort by columns
+ if (flagSpec.nonEmpty) {
+ flagSpec.map { f =>
+ f.map {
+ case list: ArrayBuffer[_] =>
+ list.map {
+ case s: SortOrder =>
+ s.collect {
+ case attr: AttributeReference =>
+ finalPredicateList = finalPredicateList.:+(attr)
+ }
+ }
+ }
+ }
+ }
fieldToDataMapFieldMap ++== getFieldsFromProject(finalPredicateList.distinct, logicalRelation)
fieldToDataMapFieldMap
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 616d0bd..7e8eb96 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -385,16 +385,15 @@ object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
gb_2a.predicateList.exists(_.semanticEquals(expr)))
val isGroupingRmE = gb_2a.predicateList.forall(expr =>
gb_2q.predicateList.exists(_.semanticEquals(expr)))
+ val isOutputEmR = gb_2q.outputList.forall {
+ case a @ Alias(_, _) =>
+ gb_2a.outputList.exists{
+ case a1: Alias => a1.child.semanticEquals(a.child)
+ case exp => exp.semanticEquals(a.child)
+ }
+ case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
+ }
if (isGroupingEmR && isGroupingRmE) {
- val isOutputEmR = gb_2q.outputList.forall {
- case a @ Alias(_, _) =>
- gb_2a.outputList.exists{
- case a1: Alias => a1.child.semanticEquals(a.child)
- case exp => exp.semanticEquals(a.child)
- }
- case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
- }
-
if (isOutputEmR) {
// Mappings of output of two plans by checking semantic equals.
val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) =>
@@ -424,11 +423,35 @@ object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
Utils.tryMatch(
gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
- Some(g.copy(child = g.child.withNewChildren(
- g.child.children.map {
- case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
- case other => other
- })));
+ // Check any agg function exists on outputlist, in case of expressions like
+ // sum(a), then create new alias and copy to group by node
+ val aggFunExists = g.outputList.exists { f =>
+ f.find {
+ case _: AggregateExpression => true
+ case _ => false
+ }.isDefined
+ }
+ if (aggFunExists && !isGroupingRmE && isOutputEmR) {
+ val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+ val sel_1a = g.child.asInstanceOf[Select]
+
+ val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
+ tChildren += gb_2a
+ val sel_1q_temp = sel_1a.copy(
+ predicateList = sel_1a.predicateList,
+ children = tChildren,
+ joinEdges = sel_1a.joinEdges,
+ aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
+
+ val res = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
+ Some(g.copy(child = res))
+ } else {
+ Some(g.copy(child = g.child.withNewChildren(
+ g.child.children.map {
+ case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
+ case other => other
+ })));
+ }
case _ => None}.map(Seq(_)).getOrElse(Nil)
} else {
Nil
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index a81cd2f..1750ce7 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -541,6 +541,35 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
sql("drop table IF EXISTS maintable")
}
+ test("test query aggregation on mv datamap ") {
+ sql("drop table if exists maintable")
+ sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
+ sql("insert into maintable values('abc',1,'a'),('def',2,'b'),('ghi',3,'c')")
+ val res = sql("select sum(age) from maintable")
+ sql("drop datamap if exists mv3")
+ sql("create datamap mv3 on table maintable using 'mv' as select age,sum(age) from maintable group by age")
+ val df = sql("select sum(age) from maintable")
+ TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "mv3")
+ checkAnswer(res, df)
+ sql("drop table if exists maintable")
+ }
+
+ test("test order by columns not given in projection") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ val res = sql("select name from maintable order by c_code")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' as select name from maintable order by c_code")
+ val df = sql("select name from maintable order by c_code")
+ TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "dm1")
+ checkAnswer(res, df)
+ intercept[Exception] {
+ sql("alter table maintable drop columns(c_code)")
+ }.getMessage.contains("Column name cannot be dropped because it exists in mv datamap: dm1")
+ sql("drop table if exists maintable")
+ }
+
test("drop meta cache on mv datamap table") {
sql("drop table IF EXISTS maintable")
sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
index 4feab2f..699b189 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
@@ -101,8 +101,17 @@ class TestMVTimeSeriesCreateDataMapCommand extends QueryTest with BeforeAndAfter
sql(
"create datamap datamap1 on table maintable_new using 'mv' as " +
"select timeseries(projectjoindate,'second') from maintable_new")
- }.getMessage
- .contains("Granularity should be DAY,MONTH or YEAR, for timeseries column of Date type")
+ }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type")
+ intercept[MalformedCarbonCommandException] {
+ sql(
+ "create datamap datamap1 on table maintable_new using 'mv' as " +
+ "select timeseries(projectjoindate,'five_minute') from maintable_new")
+ }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type")
+ intercept[MalformedCarbonCommandException] {
+ sql(
+ "create datamap datamap1 on table maintable_new using 'mv' as " +
+ "select timeseries(projectjoindate,'hour') from maintable_new")
+ }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type")
sql("drop table IF EXISTS maintable_new")
}
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index a0c3f1a..b75243f 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -73,7 +73,7 @@ EXPLAIN SELECT a, sum(b) from maintable group by a;
GROUP BY country, sex
```
**NOTE**:
- * Group by/Filter columns has to be provided in projection list while creating mv datamap
+ * Group by columns has to be provided in projection list while creating mv datamap
* If only single parent table is involved in mv datamap creation, then TableProperties of Parent table
(if not present in a aggregate function like sum(col)) listed below will be
inherited to datamap table
@@ -91,7 +91,14 @@ EXPLAIN SELECT a, sum(b) from maintable group by a;
12. NO_INVERTED_INDEX
13. COLUMN_COMPRESSOR
- * All columns of main table at once cannot participate in mv datamap table creation
+ * Creating MV datamap with select query containing only project of all columns of maintable is unsupported
+
+ **Example:**
+ If table 'x' contains columns 'a,b,c',
+ then creating MV datamap with below queries is not supported.
+
+ 1. ```select a,b,c from x```
+ 2. ```select * from x```
* TableProperties can be provided in DMProperties excluding LOCAL_DICTIONARY_INCLUDE,
LOCAL_DICTIONARY_EXCLUDE, DICTIONARY_INCLUDE, DICTIONARY_EXCLUDE, INVERTED_INDEX,
NO_INVERTED_INDEX, SORT_COLUMNS, LONG_STRING_COLUMNS, RANGE_COLUMN & COLUMN_META_CACHE
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index e72cf9b..f79e7d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -154,10 +154,10 @@ object TimeSeriesUtil {
timeSeriesFunction: String): Unit = {
for (granularity <- Granularity.values()) {
if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
- .substring(0, granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
+ .substring(0, granularity.getName.lastIndexOf(CarbonCommonConstants.UNDERSCORE)))) {
if (!supportedGranularitiesForDate.contains(granularity.getName)) {
throw new MalformedCarbonCommandException(
- "Granularity should be DAY,MONTH or YEAR, for timeseries column of Date type")
+ "Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type")
}
}
}