You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kumarvishal09 <gi...@git.apache.org> on 2017/12/27 15:50:18 UTC
[GitHub] carbondata pull request #1728: [CARBONDATA-1926] Expression support inside a...
GitHub user kumarvishal09 opened a pull request:
https://github.com/apache/carbondata/pull/1728
[CARBONDATA-1926] Expression support inside aggregate function for Query
PR to support transforming of query plan for aggregate table when query aggregate function contains any expression
- [ ] Any interfaces changed?
NA
- [ ] Any backward compatibility impacted?
NA
- [ ] Document update required?
NA
- [ ] Testing done
Added UT to validate pre aggregate table selection and Data validation
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kumarvishal09/incubator-carbondata ExpressionSupportInQuery
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/1728.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1728
----
commit 361790641779d881e21f54d7c0f78c19fcf3490e
Author: kumarvishal <ku...@...>
Date: 2017-12-20T10:16:02Z
Added code to support expression inside aggregate function
commit 3344f0fd3dd5cca31e26d2f1909fce600e8dd8e8
Author: kumarvishal <ku...@...>
Date: 2017-12-25T09:34:39Z
Added code to support expression inside aggregateExpression in query
----
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159051463
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
@@ -62,6 +62,12 @@
*/
private int ordinal = Integer.MAX_VALUE;
+ /**
--- End diff --
Remove unnecessary attributes like parentColumnToAggregationsMapping
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2426/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2594/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1360/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2755/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2787/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159058055
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -1124,92 +1328,57 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
*
* @param carbonTable
* parent table
- * @param aggFunctions
- * aggregation function
- * @param tableName
- * parent table name
+ * @param aggExp
+ * aggregate expression
* @return list of fields
*/
def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
- aggFunctions: AggregateFunction,
- tableName: String
- ): Seq[QueryColumn] = {
+ aggExp: AggregateExpression): Seq[AggregateExpression] = {
val changedDataType = true
--- End diff --
Remove unused variable
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r158934173
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
@@ -0,0 +1,105 @@
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
+ sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
+ sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
+ sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
+ sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ }
+
+ test("test pre agg create table with expression 1") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
+ }
+
+ test("test pre agg create table with expression 2") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 3") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 4") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 5") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
+ }
+
+ test("test pre agg table selection with expression 1") {
+ val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+
+ test("test pre agg table selection with expression 2") {
+ val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test pre agg table selection with expression 3") {
+ val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
+ checkAnswer(df, Seq(Row(6.0)))
+ }
+
+ test("test pre agg table selection with expression 4") {
+ val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ checkAnswer(df, Seq(Row(2.0)))
+ }
+
+ test("test pre agg table selection with expression 5") {
+ val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+ checkAnswer(df, Seq(Row(2.0,6.0)))
+ }
+
+ def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
--- End diff --
add comment for this function
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2577/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2612/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159442746
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
--- End diff --
You can directly use `updateExpression` no need of this method. `NamedExpression` is derived from `Expression`. So after updation you can type cast.
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2551/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159609322
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -1218,112 +1180,125 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* parent column name
* @param carbonTable
* parent carbon table
- * @param tableName
- * parent table name
- * @param aggFunction
- * aggregate function applied
- * @param dataType
- * data type of the column
- * @param isChangedDataType
- * is cast is applied on column
* @param isFilterColumn
* is filter is applied on column
* @return query column
*/
def getQueryColumn(columnName: String,
carbonTable: CarbonTable,
- tableName: String,
- aggFunction: String = "",
- dataType: String = "",
- isChangedDataType: Boolean = false,
isFilterColumn: Boolean = false,
timeseriesFunction: String = ""): QueryColumn = {
- val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
+ val columnSchema = carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase)
if(null == columnSchema) {
null
} else {
- if (isChangedDataType) {
new QueryColumn(columnSchema.getColumnSchema,
- columnSchema.getDataType.getName,
- aggFunction.toLowerCase,
isFilterColumn,
timeseriesFunction.toLowerCase)
- } else {
- new QueryColumn(columnSchema.getColumnSchema,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
- aggFunction.toLowerCase,
- isFilterColumn,
- timeseriesFunction.toLowerCase)
- }
}
}
}
-object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
-
+/**
+ * Data loading rule class to validate and update the data loading query plan
+ * Validation rule:
+ * 1. update the avg aggregate expression with two columns sum and count
+ * 2. Remove duplicate sum and count expression if already there in plan
+ * @param sparkSession
+ * spark session
+ */
+case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
+ extends Rule[LogicalPlan] {
+ lazy val parser = new CarbonSpark2SqlParser
override def apply(plan: LogicalPlan): LogicalPlan = {
- val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
+ val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel]
+ val namedExpressionList = scala.collection.mutable.ListBuffer.empty[NamedExpression]
plan transform {
- case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
+ case aggregate@Aggregate(_,
+ aExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ if validateAggregateExpressions(aExp) &&
+ logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ val carbonTable = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ .carbonTable
aExp.foreach {
- case alias: Alias =>
- validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias)
- case _: UnresolvedAlias =>
- case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr)
+ case attr: AttributeReference =>
+ namedExpressionList += attr
+ case alias@Alias(_: AttributeReference, _) =>
+ namedExpressionList += alias
+ case alias@Alias(aggExp: AggregateExpression, name) =>
+ // get the updated expression for avg convert it to two expression
+ // sum and count
+ val expressions = PreAggregateUtil.getUpdateAggregateExpressions(aggExp)
+ // if size is more than one then it was for average
+ if(expressions.size > 1) {
+ // get the logical plan for sum expression
+ val logicalPlan_sum = PreAggregateUtil.getLogicalPlanFromAggExp(
+ expressions.head,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName,
+ logicalRelation,
+ sparkSession,
+ parser)
+ // get the logical plan fro count expression
+ val logicalPlan_count = PreAggregateUtil.getLogicalPlanFromAggExp(
+ expressions.last,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName,
+ logicalRelation,
+ sparkSession,
+ parser)
+ // check with same expression already sum is present then do not add to
+ // named expression list otherwise update the list and add it to set
+ if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_sum))) {
+ namedExpressionList +=
+ Alias(expressions.head, name + " _ sum")(NamedExpression.newExprId,
+ alias.qualifier,
+ Some(alias.metadata),
+ alias.isGenerated)
+ validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_sum)
+ }
+ // check with same expression already count is present then do not add to
+ // named expression list otherwise update the list and add it to set
+ if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_count))) {
+ namedExpressionList +=
+ Alias(expressions.last, name + " _ count")(NamedExpression.newExprId,
+ alias.qualifier,
+ Some(alias.metadata),
+ alias.isGenerated)
+ validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_count)
+ }
+ } else {
+ // get the logical plan for expression
+ val logicalPlan = PreAggregateUtil.getLogicalPlanFromAggExp(
+ expressions.head,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName,
+ logicalRelation,
+ sparkSession,
+ parser)
+ // check with same expression already present then do not add to
+ // named expression list otherwise update the list and add it to set
+ if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan))) {
+ namedExpressionList+=alias
+ validExpressionsMap += AggExpToColumnMappingModel(logicalPlan)
+ }
+ }
+ case alias@Alias(_: Expression, _) =>
+ namedExpressionList += alias
}
- aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq)
+ aggregate.copy(aggregateExpressions = namedExpressionList.toSeq)
case plan: LogicalPlan => plan
}
}
- /**
- * This method will split the avg column into sum and count and will return a sequence of tuple
- * of unique name, alias
- *
- */
- private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String,
- NamedExpression)] = {
- alias match {
- case udf@Alias(_: ScalaUDF, name) =>
- Seq((name, udf))
- case alias@Alias(attrExpression: AggregateExpression, _) =>
- attrExpression.aggregateFunction match {
- case Sum(attr: AttributeReference) =>
- (attr.name + "_sum", alias) :: Nil
- case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
- (attr.name + "_sum", alias) :: Nil
- case Count(Seq(attr: AttributeReference)) =>
- (attr.name + "_count", alias) :: Nil
- case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) =>
- (attr.name + "_count", alias) :: Nil
- case Average(attr: AttributeReference) =>
- Seq((attr.name + "_sum", Alias(attrExpression.
- copy(aggregateFunction = Sum(attr),
- resultId = NamedExpression.newExprId), attr.name + "_sum")()),
- (attr.name, Alias(attrExpression.
- copy(aggregateFunction = Count(attr),
- resultId = NamedExpression.newExprId), attr.name + "_count")()))
- case Average(cast@MatchCastExpression(attr: AttributeReference, _)) =>
- Seq((attr.name + "_sum", Alias(attrExpression.
- copy(aggregateFunction = Sum(cast),
- resultId = NamedExpression.newExprId),
- attr.name + "_sum")()),
- (attr.name, Alias(attrExpression.
- copy(aggregateFunction = Count(cast), resultId =
- NamedExpression.newExprId), attr.name + "_count")()))
- case _ => Seq(("", alias))
- }
-
- }
- }
-
/**
* Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not.
* If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is
* valid.
- *
* @param namedExpression
- * @return
+ * named expressions
--- End diff --
move it up. comment should like
`@param namedExpression named expressions`
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159057341
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -282,6 +136,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
val listFilterColumn = list
.filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
.toList
+ val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
// getting all the aggregation columns
val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
--- End diff --
This code is unused now , please remove all code related to it.
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159459370
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
+ if (conditionExp.isDefined) {
+ val filterExp = conditionExp.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ childExp.get._2
+ } else {
+ attr
}
- carbonTable
- // case for handling aggregation with order by when only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ })
+ } else {
+ conditionExp
+ }
+ }
+
+ /**
+ * Below method will be used to validate and transform the main table plan to child table plan
+ * rules for transforming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ * 5. timeseries function
+ * 5.1 validate parent table has timeseries datamap
+ * 5.2 timeseries function is valid function or not
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ // case for aggregation query
+ case agg@Aggregate(grExp,
+ aggExp,
+ child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ val carbonTable = getCarbonTable(logicalRelation)
+ val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+ val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+ val isValidPlan = extractQueryColumnsFromAggExpression(
+ grExp,
+ aggExp,
+ carbonTable,
+ list,
+ aggregateExpressions)
+ if(isValidPlan) {
+ val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+ aggregateExpressions,
carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
- carbonTable = carbonTable,
- tableName = tableName)
+ logicalRelation)
+ if(null != aggDataMapSchema && null!= childPlan) {
+ val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ None,
+ aggDataMapSchema,
+ attributes,
+ childPlan,
+ carbonTable,
+ logicalRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ newChild)
+ } else {
+ agg
}
- carbonTable
- // case for handling aggregation with order by and filter when only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ } else {
+ agg
+ }
+ // case of handling aggregation query with filter
+ case agg@Aggregate(grExp,
+ aggExp,
+ Filter(expression, child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ val carbonTable = getCarbonTable(logicalRelation)
+ val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+ val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+ var isValidPlan = extractQueryColumnsFromAggExpression(
+ grExp,
+ aggExp,
+ carbonTable,
+ list,
+ aggregateExpressions)
+ // getting the columns from filter expression
+ isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
+ if (isValidPlan) {
+ isValidPlan = extractQueryColumnFromFilterExp(expression, list, carbonTable)
+ }
+ if(isValidPlan) {
+ val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+ aggregateExpressions,
carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- if(isValidPlan) {
- list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
- carbonTable = carbonTable,
- tableName = tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
- }
- carbonTable
- case _ =>
- isValidPlan = false
- null
- }
- if (isValidPlan && null != carbonTable) {
- isValidPlan = isSpecificSegmentPresent(carbonTable)
- }
- // if plan is valid then update the plan with child attributes
- if (isValidPlan) {
- // getting all the projection columns
- val listProjectionColumn = list
- .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
- .toList
- // getting all the filter columns
- val listFilterColumn = list
- .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
- .toList
- // getting all the aggregation columns
- val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
- .toList
- // create a query plan object which will be used to select the list of pre aggregate tables
- // matches with this plan
- val queryPlan = new QueryPlan(listProjectionColumn.asJava,
- listAggregationColumn.asJava,
- listFilterColumn.asJava)
- // create aggregate table selector object
- val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
- // select the list of valid child tables
- val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
- // if it does not match with any pre aggregate table return the same plan
- if (!selectedDataMapSchemas.isEmpty) {
- // filter the selected child schema based on size to select the pre-aggregate tables
- // that are nonEmpty
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
- val identifier = TableIdentifier(
- selectedDataMapSchema.getRelationIdentifier.getTableName,
- Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
- val carbonRelation =
- catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
- val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
- (selectedDataMapSchema, carbonRelation, relation)
- }.filter(_._2.sizeInBytes != 0L)
- if (relationBuffer.isEmpty) {
- // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
- // tables have date yet.
- // In this case we would return the original plan so that the query hits the parent
- // table.
- plan
+ logicalRelation)
+ if(null != aggDataMapSchema && null!= childPlan) {
+ val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+ val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ Some(expression),
+ aggDataMapSchema,
+ attributes,
+ childPlan,
+ carbonTable,
+ logicalRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ Filter(updatedFilterExpression.get,
+ newChild))
} else {
- // If the relationBuffer is nonEmpty then find the table with the minimum size.
- val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
- val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
- // transform the query plan based on selected child schema
- transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+ agg
}
} else {
- plan
+ agg
}
+ }
+ updatedPlan
+ }
+
+ /**
+ * Below method will be used to validate query plan and get the proper aggregation data map schema
+ * and child relation plan object if plan is valid for transformation
+ * @param queryColumns
+ * list of query columns from projection and filter
+ * @param aggregateExpressions
+ * list of aggregate expression (aggregate function)
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @return if plan is valid then aggregation data map schema and its relation plan
+ */
+ def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn],
+ aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation): (AggregationDataMapSchema, LogicalPlan) = {
+ // getting all the projection columns
+ val listProjectionColumn = queryColumns
+ .filter(queryColumn => !queryColumn.isFilterColumn)
+ .toList
+ // getting all the filter columns
+ val listFilterColumn = queryColumns
+ .filter(queryColumn => queryColumn.isFilterColumn)
+ .toList
+ val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
+ // create a query plan object which will be used to select the list of pre aggregate tables
+ // matches with this plan
+ val queryPlan = new QueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava)
+ // create aggregate table selector object
+ val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
+ // select the list of valid child tables
+ val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
+ // query has only aggregate expression then selected data map will be empty
+ // the validate all the child data map otherwise validate selected data map
+ var selectedAggMaps = if (isProjectionColumnPresent) {
+ selectedDataMapSchemas
+ } else {
+ carbonTable.getTableInfo.getDataMapSchemaList
+ }
+ val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
+ PreAggregateUtil.getLogicalPlanFromAggExp(queryAggExp,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName,
+ logicalRelation,
+ sparkSession,
+ parser)
+ }.toSeq
+ // if query does not have any aggregate function no need to validate the same
+ if (aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
+ selectedAggMaps = validateAggregateExpression(selectedAggMaps.asScala.toSeq,
+ carbonTable,
+ logicalRelation,
+ aggExpLogicalPlans).asJava
+ }
+ // if it does not match with any pre aggregate table return the same plan
+ if (!selectedAggMaps.isEmpty) {
+ // filter the selected child schema based on size to select the pre-aggregate tables
+ // that are nonEmpty
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema =>
+ val identifier = TableIdentifier(
+ selectedDataMapSchema.getRelationIdentifier.getTableName,
+ Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+ val carbonRelation =
+ catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
+ (selectedDataMapSchema, carbonRelation, relation)
+ }.filter(_._2.sizeInBytes != 0L)
+ if (relationBuffer.isEmpty) {
+ // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
+ // tables have date yet.
+ // In this case we would return the original plan so that the query hits the parent
+ // table.
+ (null, null)
} else {
- plan
+ // If the relationBuffer is nonEmpty then find the table with the minimum size.
+ val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
+ val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
+ (aggDataMapSchema.asInstanceOf[AggregationDataMapSchema], newRelation)
}
+ } else {
+ (null, null)
}
}
+ /**
+ * Below method will be used to validate aggregate expression with the data map
+ * and will return the selected valid data maps
+ * @param selectedDataMap
+ * list of data maps
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @param queryAggExpLogicalPlans
+ * query agg expression logical plan
+ * @return valid data map
+ */
+ def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation,
+ queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
+ def validateDataMap(dataMap: DataMapSchema,
+ aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
+ val mappingModel = getLogicalPlanForAggregateExpression(dataMap,
+ carbonTable,
+ logicalRelation)
+ aggExpLogicalPlans.forall{
+ p => mappingModel.exists(m => p.sameResult(m.logicalPlan))
--- End diff --
move p => to up
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2568/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2738/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1343/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159440457
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
--- End diff --
not required to assign to variable
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443443
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
+ if (conditionExp.isDefined) {
+ val filterExp = conditionExp.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ childExp.get._2
+ } else {
+ attr
}
- carbonTable
- // case for handling aggregation with order by when only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ })
+ } else {
+ conditionExp
+ }
+ }
+
+ /**
+ * Below method will be used to validate and transform the main table plan to child table plan
+ * rules for transforming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ * 5. timeseries function
+ * 5.1 validate parent table has timeseries datamap
+ * 5.2 timeseries function is valid function or not
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ // case for aggregation query
+ case agg@Aggregate(grExp,
--- End diff --
MOve down `grExp` to next line
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2364/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159435962
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -18,28 +18,46 @@
package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
-import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
+/**
+ * model class to store aggregate expression logical plan
+ * and its column schema mapping
+ * @param logicalPlan
+ * logical plan of aggregate expression
+ * @param columnSchema
+ * column schema from table
+ */
+case class AggExpToColumnMappingModel(var logicalPlan: LogicalPlan,
--- End diff --
Move `var logicalPlan: LogicalPlan` to next line
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2575/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159440196
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
--- End diff --
Better use match { case } instead of if else
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2578/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159052141
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
}
+ /**
+ * Below method will be used to validate the logical plan
+ * and get all the details from to select proper aggregate table
+ * @param logicalPlan
+ * actual query logical plan
+ * @param list
+ * list of projection column present in plan
+ * @param qAggExprs
+ * list of aggregate expression
+ * @return if plan is valid for tranformation, parent table, parent logical relaion
+ */
+ def validatePlanAndGetFields(logicalPlan: LogicalPlan,
+ list: scala.collection.mutable.HashSet[QueryColumn],
+ qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
+ CarbonTable, LogicalRelation) = {
+ var isValidPlan = false
+ var pTable: CarbonTable = null
+ var qLRelation: LogicalRelation = null
+ logicalPlan.transform {
+ // to handle filter expression
+ case filter@Filter(filterExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ qLRelation = logicalRelation
+ pTable = getCarbonTableAndTableName(logicalRelation)
+ // getting the columns from filter expression
+ if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
+ isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable)
+ }
+ filter
+ // to handle aggregate expression
+ case agg@Aggregate(groupingExp,
+ aggregateExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ qLRelation = logicalRelation
+ pTable = getCarbonTableAndTableName(logicalRelation)
+ isValidPlan = extractQueryColumnsFromAggExpression(
+ groupingExp,
+ aggregateExp,
+ pTable,
+ list,
+ qAggExprs)
+ agg
+ // to handle aggregate expression with filter
+ case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
+ val out = validatePlanAndGetFields(filter, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if (isValidPlan) {
+ isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs)
+ }
+ agg
+ // to handle projection with order by
+ case proj@Project(projectList, sort@Sort(_, _, _)) =>
+ val out = validatePlanAndGetFields(sort, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
+ }
+ proj
+ // to handle only projection
+ case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
+ val out = validatePlanAndGetFields(agg, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
+ }
+ proj
+ // case for handling aggregation with order by when only projection column exits
+ case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
+ val out = validatePlanAndGetFields(agg, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++
+ extractQueryColumnForOrderBy(None, sortOrders, pTable)
+ }
+ sort
+ }
+ (isValidPlan, pTable, qLRelation)
+ }
+
+ /**
+ * Below method will be used to validate aggregate expression with the data map
+ * and will return the selected valid data maps
+ * @param selectedDataMap
+ * list of data maps
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @param queryAggExpLogicalPlans
+ * query agg expression logical plan
+ * @return valid data map
+ */
+ def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation,
+ queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
+ def validateDataMap(dataMap: DataMapSchema,
+ aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
+ val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap,
+ carbonTable,
+ logicalRelation)
+ aggExpLogicalPlans.forall{
+ p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
+ }
+ }
+ val selectedDataMapSchema = selectedDataMap.collect {
+ case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
+ dataMap
+ }
+ selectedDataMapSchema
+ }
+
+ /**
+ * Below method will be used to update the logical plan of expression
+ * with parent table logical relation
+ * @param logicalPlan
+ * @param logicalRelation
+ * @return
+ */
+ def updateLogicalRelation(logicalPlan: LogicalPlan,
+ logicalRelation: LogicalRelation): LogicalPlan = {
+ logicalPlan transform {
+ case l: LogicalRelation =>
+ l.copy(relation = logicalRelation.relation)
+ }
+ }
+
+ /**
+ * Below method will be used to to get the logical plan for each aggregate expression in
+ * child data map and its column schema mapping if mapping is already present
+ * then it will use the same otherwise it will generate and stored in aggregation data map
+ * @param selectedDataMap
+ * child data map
+ * @param carbonTable
+ * parent table
+ * @param logicalRelation
+ * logical relation of actual plan
+ * @return map of logical plan for each aggregate expression in child query and its column mapping
+ */
+ def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
+ val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
+ // if column mapping is not present
+ if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
+ // add preAGG UDF to avoid all the PreAggregate rule
+ val childDataMapQueryString = new CarbonSpark2SqlParser()
+ .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
+ // get the logical plan
+ val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
+ // getting all aggregate expression from query
+ val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
+ // in case of average child table will have two columns which will be stored in sequence
+ // so for average expression we need to get two columns for mapping
+ var counter = 0
+ // sorting the columns based on schema ordinal so search will give proper result
+ val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
+ .sortBy(_.getSchemaOrdinal)
+ val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
+ // for each aggregate expression get logical plan
+ val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName, logicalRelation)
+ // check if aggregate expression is of type avg
+ // get the columns
+ var columnSchema = aggDataMapSchema
+ .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
+ // increment the counter so when for next expression above code will be
+ // executed it will search from that schema ordinal
+ counter = columnSchema.getSchemaOrdinal + 1
+ (expLogicalPlan, columnSchema)
+ }.toMap
+ // store the mapping in data map
+ aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
+ // return the mapping
+ logicalPlanToColumnMapping
+ } else {
+ // if already present in data map then return the same
+ aggDataMapSchema.getAggregateExpressionToColumnMapping
+ .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap
+ }
+ }
+
+
+ /**
+ * Below method will be used to get the logical plan from aggregate expression
+ * @param aggExp
+ * aggregate expression
+ * @param tableName
+ * parent table name
+ * @param databaseName
+ * database name
+ * @param logicalRelation
+ * logical relation
+ * @return logical plan
+ */
+ def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
+ tableName: String,
+ databaseName: String,
+ logicalRelation: LogicalRelation): LogicalPlan = {
+ // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
+ // be applied
+ val query = new CarbonSpark2SqlParser()
+ .addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName")
+ // updating the logical relation of logical plan to so when two logical plan
+ // will be compared it will not consider relation
+ updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation)
+ }
+
+ /**
+ * Below method will be used to get aggregate expression
+ * @param logicalPlan
+ * logical plan
+ * @return list of aggregate expression
+ */
+ def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = {
+ val list = scala.collection.mutable.HashSet.empty[AggregateExpression]
--- End diff --
Should use List or LinkedHasSet as the order of insertion is required for mapping
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/1728
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1152/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443569
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
+ if (conditionExp.isDefined) {
+ val filterExp = conditionExp.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ childExp.get._2
+ } else {
+ attr
}
- carbonTable
- // case for handling aggregation with order by when only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ })
+ } else {
+ conditionExp
+ }
+ }
+
+ /**
+ * Below method will be used to validate and transform the main table plan to child table plan
+ * rules for transforming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ * 5. timeseries function
+ * 5.1 validate parent table has timeseries datamap
+ * 5.2 timeseries function is valid function or not
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ // case for aggregation query
+ case agg@Aggregate(grExp,
+ aggExp,
+ child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
--- End diff --
Indentation is wrong
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2421/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2625/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159438073
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
--- End diff --
Better do the transform instead of match as in case if Alias comes match cannot work
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2733/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159438250
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
--- End diff --
Better use `match { case }` instead of if else
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2566/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2735/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159457280
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
--- End diff --
childExp.qualifier will not have table alias name in case of join we need to table alias name
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r158934363
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -126,16 +127,17 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
carbonTable,
tableName,
- list)
- carbonTable
+ list,
+ aggregateExpressions)
+ (carbonTable, logicalRelation)
// below case for handling filter query
// When plan has grouping expression, aggregate expression
// filter expression
case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
+ aggregateExp,
--- End diff --
unnecessary change
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159457864
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -806,67 +824,105 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
val updatedAggExp = aggregateExpressions.map {
--- End diff --
It seems lot of duplicate code, better do transform and use single `case attr: AttributeReference` handle all here
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1338/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1283/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2451/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2674/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2562/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2508/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159058689
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -294,12 +149,30 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
// select the list of valid child tables
val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
+ // query has only aggregate expression then selected data map will be empty
+ // the validate all the child data map otherwise validate selected data map
+ var selectedAggMaps = if (isProjectionColumnPresent) {
+ selectedDataMapSchemas
+ } else {
+ carbonTable.getTableInfo.getDataMapSchemaList
+ }
+ val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
+ getLogicalPlanFromAggExp(queryAggExp,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName, logicalRelation)
+ }.toSeq
+ if(aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
--- End diff --
add comments
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
retest this please
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1197/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159056348
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
}
+ /**
+ * Below method will be used to validate the logical plan
+ * and get all the details from to select proper aggregate table
+ * @param logicalPlan
+ * actual query logical plan
+ * @param list
+ * list of projection column present in plan
+ * @param qAggExprs
+ * list of aggregate expression
+ * @return if plan is valid for tranformation, parent table, parent logical relaion
+ */
+ def validatePlanAndGetFields(logicalPlan: LogicalPlan,
+ list: scala.collection.mutable.HashSet[QueryColumn],
+ qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
+ CarbonTable, LogicalRelation) = {
+ var isValidPlan = false
+ var pTable: CarbonTable = null
+ var qLRelation: LogicalRelation = null
+ logicalPlan.transform {
--- End diff --
It seems Join condition of two plans is not handled here. Please handle Join, Union cases also using recursion.
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2612/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/1728
retest this please
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1377/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
LGTM
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2722/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159053493
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
}
+ /**
+ * Below method will be used to validate the logical plan
+ * and get all the details from to select proper aggregate table
+ * @param logicalPlan
+ * actual query logical plan
+ * @param list
+ * list of projection column present in plan
+ * @param qAggExprs
+ * list of aggregate expression
+ * @return if plan is valid for tranformation, parent table, parent logical relaion
+ */
+ def validatePlanAndGetFields(logicalPlan: LogicalPlan,
+ list: scala.collection.mutable.HashSet[QueryColumn],
+ qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
+ CarbonTable, LogicalRelation) = {
+ var isValidPlan = false
+ var pTable: CarbonTable = null
+ var qLRelation: LogicalRelation = null
+ logicalPlan.transform {
+ // to handle filter expression
+ case filter@Filter(filterExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ qLRelation = logicalRelation
+ pTable = getCarbonTableAndTableName(logicalRelation)
+ // getting the columns from filter expression
+ if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
+ isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable)
+ }
+ filter
+ // to handle aggregate expression
+ case agg@Aggregate(groupingExp,
+ aggregateExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ qLRelation = logicalRelation
+ pTable = getCarbonTableAndTableName(logicalRelation)
+ isValidPlan = extractQueryColumnsFromAggExpression(
+ groupingExp,
+ aggregateExp,
+ pTable,
+ list,
+ qAggExprs)
+ agg
+ // to handle aggregate expression with filter
+ case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
+ val out = validatePlanAndGetFields(filter, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if (isValidPlan) {
+ isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs)
+ }
+ agg
+ // to handle projection with order by
+ case proj@Project(projectList, sort@Sort(_, _, _)) =>
+ val out = validatePlanAndGetFields(sort, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
+ }
+ proj
+ // to handle only projection
+ case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
+ val out = validatePlanAndGetFields(agg, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
+ }
+ proj
+ // case for handling aggregation with order by when only projection column exits
+ case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
+ val out = validatePlanAndGetFields(agg, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++
+ extractQueryColumnForOrderBy(None, sortOrders, pTable)
+ }
+ sort
+ }
+ (isValidPlan, pTable, qLRelation)
+ }
+
+ /**
+ * Below method will be used to validate aggregate expression with the data map
+ * and will return the selected valid data maps
+ * @param selectedDataMap
+ * list of data maps
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @param queryAggExpLogicalPlans
+ * query agg expression logical plan
+ * @return valid data map
+ */
+ def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation,
+ queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
+ def validateDataMap(dataMap: DataMapSchema,
+ aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
+ val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap,
+ carbonTable,
+ logicalRelation)
+ aggExpLogicalPlans.forall{
+ p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
+ }
+ }
+ val selectedDataMapSchema = selectedDataMap.collect {
+ case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
+ dataMap
+ }
+ selectedDataMapSchema
+ }
+
+ /**
+ * Below method will be used to update the logical plan of expression
+ * with parent table logical relation
+ * @param logicalPlan
+ * @param logicalRelation
+ * @return
+ */
+ def updateLogicalRelation(logicalPlan: LogicalPlan,
+ logicalRelation: LogicalRelation): LogicalPlan = {
+ logicalPlan transform {
+ case l: LogicalRelation =>
+ l.copy(relation = logicalRelation.relation)
+ }
+ }
+
+ /**
+ * Below method will be used to to get the logical plan for each aggregate expression in
+ * child data map and its column schema mapping if mapping is already present
+ * then it will use the same otherwise it will generate and stored in aggregation data map
+ * @param selectedDataMap
+ * child data map
+ * @param carbonTable
+ * parent table
+ * @param logicalRelation
+ * logical relation of actual plan
+ * @return map of logical plan for each aggregate expression in child query and its column mapping
+ */
+ def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
+ val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
+ // if column mapping is not present
+ if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
+ // add preAGG UDF to avoid all the PreAggregate rule
+ val childDataMapQueryString = new CarbonSpark2SqlParser()
+ .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
+ // get the logical plan
+ val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
+ // getting all aggregate expression from query
+ val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
+ // in case of average child table will have two columns which will be stored in sequence
+ // so for average expression we need to get two columns for mapping
+ var counter = 0
+ // sorting the columns based on schema ordinal so search will give proper result
+ val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
+ .sortBy(_.getSchemaOrdinal)
+ val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
+ // for each aggregate expression get logical plan
+ val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName, logicalRelation)
+ // check if aggregate expression is of type avg
+ // get the columns
+ var columnSchema = aggDataMapSchema
+ .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
+ // increment the counter so when for next expression above code will be
+ // executed it will search from that schema ordinal
+ counter = columnSchema.getSchemaOrdinal + 1
+ (expLogicalPlan, columnSchema)
+ }.toMap
+ // store the mapping in data map
+ aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
+ // return the mapping
+ logicalPlanToColumnMapping
+ } else {
+ // if already present in data map then return the same
+ aggDataMapSchema.getAggregateExpressionToColumnMapping
+ .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap
+ }
+ }
+
+
+ /**
+ * Below method will be used to get the logical plan from aggregate expression
+ * @param aggExp
+ * aggregate expression
+ * @param tableName
+ * parent table name
+ * @param databaseName
+ * database name
+ * @param logicalRelation
+ * logical relation
+ * @return logical plan
+ */
+ def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
+ tableName: String,
+ databaseName: String,
+ logicalRelation: LogicalRelation): LogicalPlan = {
+ // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
+ // be applied
+ val query = new CarbonSpark2SqlParser()
--- End diff --
Don't create parser every time, pass from caller
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159458812
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
+ if (conditionExp.isDefined) {
+ val filterExp = conditionExp.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ childExp.get._2
+ } else {
+ attr
}
- carbonTable
- // case for handling aggregation with order by when only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ })
+ } else {
+ conditionExp
+ }
+ }
+
+ /**
+ * Below method will be used to validate and transform the main table plan to child table plan
+ * rules for transforming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ * 5. timeseries function
+ * 5.1 validate parent table has timeseries datamap
+ * 5.2 timeseries function is valid function or not
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ // case for aggregation query
+ case agg@Aggregate(grExp,
+ aggExp,
+ child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ val carbonTable = getCarbonTable(logicalRelation)
+ val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+ val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+ val isValidPlan = extractQueryColumnsFromAggExpression(
+ grExp,
+ aggExp,
+ carbonTable,
+ list,
+ aggregateExpressions)
+ if(isValidPlan) {
+ val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+ aggregateExpressions,
carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
- carbonTable = carbonTable,
- tableName = tableName)
+ logicalRelation)
+ if(null != aggDataMapSchema && null!= childPlan) {
+ val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ None,
+ aggDataMapSchema,
+ attributes,
+ childPlan,
+ carbonTable,
+ logicalRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ newChild)
+ } else {
+ agg
}
- carbonTable
- // case for handling aggregation with order by and filter when only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ } else {
+ agg
+ }
+ // case of handling aggregation query with filter
+ case agg@Aggregate(grExp,
--- End diff --
How about with Sort ?
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159378009
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
@@ -0,0 +1,105 @@
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
+ sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
+ sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
+ sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
+ sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ }
+
+ test("test pre agg create table with expression 1") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
+ }
+
+ test("test pre agg create table with expression 2") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 3") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 4") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 5") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
+ }
+
+ test("test pre agg table selection with expression 1") {
+ val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+
+ test("test pre agg table selection with expression 2") {
+ val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test pre agg table selection with expression 3") {
+ val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
+ checkAnswer(df, Seq(Row(6.0)))
+ }
+
+ test("test pre agg table selection with expression 4") {
+ val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ checkAnswer(df, Seq(Row(2.0)))
+ }
+
+ test("test pre agg table selection with expression 5") {
+ val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+ checkAnswer(df, Seq(Row(2.0,6.0)))
+ }
+
--- End diff --
add TestPreAggregateWithSubQuery
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159438754
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
--- End diff --
why you use `attr.qualifier ` intead of `childExp.qualifier`
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159442284
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
--- End diff --
use transform inside map , then you no need to handle Alias separetly.
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1202/
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159058395
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -933,121 +1143,122 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* child data map schema
* @param attributes
* child logical relation
+ * @param expLogicalPlanToColumnSchemaMapping
+ * expression logical plan to data map column mapping
+ * @param parentTable
+ * parent carbon table
+ * @param logicalRelation
+ * logical relation
* @return updated expression
*/
def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
- dataMapSchema: DataMapSchema,
- attributes: Seq[AttributeReference]):
+ dataMapSchema: AggregationDataMapSchema,
+ attributes: Seq[AttributeReference],
+ expLogicalPlanToColumnSchemaMapping: Option[Map[LogicalPlan, ColumnSchema]],
+ parentTable: CarbonTable,
+ logicalRelation: LogicalRelation):
Expression = {
+ val updatedAggExp = getUpdateAggregateExpressions(aggExp)
--- End diff --
Add code comment here
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443128
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
@@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
- // only carbon query plan is supported checking whether logical relation is
- // is for carbon
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
- if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
--- End diff --
Here also you use updateExpression method, no need of this method
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2734/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2585/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1219/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1349/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/1728
Retest this please
---
[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r158934260
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
@@ -0,0 +1,105 @@
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
+ sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
+ sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
+ sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
+ sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ }
+
+ test("test pre agg create table with expression 1") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
+ }
+
+ test("test pre agg create table with expression 2") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 3") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 4") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 5") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
+ }
+
+ test("test pre agg table selection with expression 1") {
+ val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+
+ test("test pre agg table selection with expression 2") {
+ val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test pre agg table selection with expression 3") {
+ val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
+ checkAnswer(df, Seq(Row(6.0)))
+ }
+
+ test("test pre agg table selection with expression 4") {
+ val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ checkAnswer(df, Seq(Row(2.0)))
+ }
+
+ test("test pre agg table selection with expression 5") {
+ val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+ checkAnswer(df, Seq(Row(2.0,6.0)))
+ }
+
--- End diff --
add a testcase for subquery also
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2732/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1326/
---
[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...
Posted by sraghunandan <gi...@git.apache.org>.
Github user sraghunandan commented on the issue:
https://github.com/apache/carbondata/pull/1728
retest sdv please
---