You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kumarvishal09 <gi...@git.apache.org> on 2017/12/27 15:50:18 UTC

[GitHub] carbondata pull request #1728: [CARBONDATA-1926] Expression support inside a...

GitHub user kumarvishal09 opened a pull request:

    https://github.com/apache/carbondata/pull/1728

    [CARBONDATA-1926] Expression support inside aggregate function for Query

    PR to support transforming of query plan for aggregate table when query aggregate function contains any expression
    
     - [ ] Any interfaces changed?
     NA
     - [ ] Any backward compatibility impacted?
     NA
     - [ ] Document update required?
    NA
     - [ ] Testing done
         Added UT to validate pre aggregate table selection and Data validation
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kumarvishal09/incubator-carbondata ExpressionSupportInQuery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1728.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1728
    
----
commit 361790641779d881e21f54d7c0f78c19fcf3490e
Author: kumarvishal <ku...@...>
Date:   2017-12-20T10:16:02Z

    Added code to support expression inside aggregate function

commit 3344f0fd3dd5cca31e26d2f1909fce600e8dd8e8
Author: kumarvishal <ku...@...>
Date:   2017-12-25T09:34:39Z

    Added code to support expression inside aggregateExpression in query

----


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159051463
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -62,6 +62,12 @@
        */
       private int ordinal = Integer.MAX_VALUE;
     
    +  /**
    --- End diff --
    
    Remove unnecessary attributes like parentColumnToAggregationsMapping


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2426/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2594/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1360/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2755/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2787/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159058055
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -1124,92 +1328,57 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        *
        * @param carbonTable
        * parent table
    -   * @param aggFunctions
    -   * aggregation function
    -   * @param tableName
    -   * parent table name
    +   * @param aggExp
    +   * aggregate expression
        * @return list of fields
        */
       def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
    -      aggFunctions: AggregateFunction,
    -      tableName: String
    -  ): Seq[QueryColumn] = {
    +      aggExp: AggregateExpression): Seq[AggregateExpression] = {
         val changedDataType = true
    --- End diff --
    
    Remove unused variable


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r158934173
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,105 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    +  test("test pre agg table selection with expression 1") {
    +    val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
    +  }
    +
    +
    +  test("test pre agg table selection with expression 2") {
    +    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
    +  }
    +
    +  test("test pre agg table selection with expression 3") {
    +    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
    +    checkAnswer(df, Seq(Row(6.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 4") {
    +    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
    +    checkAnswer(df, Seq(Row(2.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 5") {
    +    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
    +    checkAnswer(df, Seq(Row(2.0,6.0)))
    +  }
    +
    +  def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
    --- End diff --
    
    add comment for this function


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2577/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2612/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159442746
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    --- End diff --
    
    You can directly use `updateExpression` no need of this method. `NamedExpression` is derived from `Expression`. So after updation you can type cast.


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2551/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159609322
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -1218,112 +1180,125 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        * parent column name
        * @param carbonTable
        * parent carbon table
    -   * @param tableName
    -   * parent table name
    -   * @param aggFunction
    -   * aggregate function applied
    -   * @param dataType
    -   * data type of the column
    -   * @param isChangedDataType
    -   * is cast is applied on column
        * @param isFilterColumn
        * is filter is applied on column
        * @return query column
        */
       def getQueryColumn(columnName: String,
           carbonTable: CarbonTable,
    -      tableName: String,
    -      aggFunction: String = "",
    -      dataType: String = "",
    -      isChangedDataType: Boolean = false,
           isFilterColumn: Boolean = false,
           timeseriesFunction: String = ""): QueryColumn = {
    -    val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
    +    val columnSchema = carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase)
         if(null == columnSchema) {
           null
         } else {
    -      if (isChangedDataType) {
             new QueryColumn(columnSchema.getColumnSchema,
    -          columnSchema.getDataType.getName,
    -          aggFunction.toLowerCase,
               isFilterColumn,
               timeseriesFunction.toLowerCase)
    -      } else {
    -        new QueryColumn(columnSchema.getColumnSchema,
    -          CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
    -          aggFunction.toLowerCase,
    -          isFilterColumn,
    -          timeseriesFunction.toLowerCase)
    -      }
         }
       }
     }
     
    -object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
    -
    +/**
    + * Data loading rule class to validate and update the data loading query plan
    + * Validation rule:
    + * 1. update the avg aggregate expression with two columns sum and count
    + * 2. Remove duplicate sum and count expression if already there in plan
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
    +  extends Rule[LogicalPlan] {
    +  lazy val parser = new CarbonSpark2SqlParser
       override def apply(plan: LogicalPlan): LogicalPlan = {
    -    val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
    +    val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel]
    +    val namedExpressionList = scala.collection.mutable.ListBuffer.empty[NamedExpression]
         plan transform {
    -      case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
    +      case aggregate@Aggregate(_,
    +      aExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if validateAggregateExpressions(aExp) &&
    +        logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        val carbonTable = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
    +          .carbonTable
             aExp.foreach {
    -          case alias: Alias =>
    -            validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias)
    -          case _: UnresolvedAlias =>
    -          case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr)
    +          case attr: AttributeReference =>
    +              namedExpressionList += attr
    +            case alias@Alias(_: AttributeReference, _) =>
    +              namedExpressionList += alias
    +            case alias@Alias(aggExp: AggregateExpression, name) =>
    +              // get the updated expression for avg convert it to two expression
    +              // sum and count
    +              val expressions = PreAggregateUtil.getUpdateAggregateExpressions(aggExp)
    +              // if size is more than one then it was for average
    +              if(expressions.size > 1) {
    +                // get the logical plan for sum expression
    +                val logicalPlan_sum = PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.head,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // get the logical plan fro count expression
    +                val logicalPlan_count = PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.last,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // check with same expression already sum is present then do not add to
    +                // named expression list otherwise update the list and add it to set
    +                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_sum))) {
    +                  namedExpressionList +=
    +                  Alias(expressions.head, name + " _ sum")(NamedExpression.newExprId,
    +                    alias.qualifier,
    +                    Some(alias.metadata),
    +                    alias.isGenerated)
    +                  validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_sum)
    +                }
    +                // check with same expression already count is present then do not add to
    +                // named expression list otherwise update the list and add it to set
    +                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_count))) {
    +                  namedExpressionList +=
    +                  Alias(expressions.last, name + " _ count")(NamedExpression.newExprId,
    +                    alias.qualifier,
    +                    Some(alias.metadata),
    +                    alias.isGenerated)
    +                  validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_count)
    +                }
    +              } else {
    +                // get the logical plan for expression
    +                val logicalPlan = PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.head,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // check with same expression already  present then do not add to
    +                // named expression list otherwise update the list and add it to set
    +                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan))) {
    +                  namedExpressionList+=alias
    +                  validExpressionsMap += AggExpToColumnMappingModel(logicalPlan)
    +                }
    +              }
    +            case alias@Alias(_: Expression, _) =>
    +              namedExpressionList += alias
             }
    -        aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq)
    +        aggregate.copy(aggregateExpressions = namedExpressionList.toSeq)
           case plan: LogicalPlan => plan
         }
       }
     
    -    /**
    -     * This method will split the avg column into sum and count and will return a sequence of tuple
    -     * of unique name, alias
    -     *
    -     */
    -    private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String,
    -      NamedExpression)] = {
    -      alias match {
    -        case udf@Alias(_: ScalaUDF, name) =>
    -          Seq((name, udf))
    -        case alias@Alias(attrExpression: AggregateExpression, _) =>
    -          attrExpression.aggregateFunction match {
    -            case Sum(attr: AttributeReference) =>
    -              (attr.name + "_sum", alias) :: Nil
    -            case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
    -              (attr.name + "_sum", alias) :: Nil
    -            case Count(Seq(attr: AttributeReference)) =>
    -              (attr.name + "_count", alias) :: Nil
    -            case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) =>
    -              (attr.name + "_count", alias) :: Nil
    -            case Average(attr: AttributeReference) =>
    -              Seq((attr.name + "_sum", Alias(attrExpression.
    -                copy(aggregateFunction = Sum(attr),
    -                  resultId = NamedExpression.newExprId), attr.name + "_sum")()),
    -                (attr.name, Alias(attrExpression.
    -                  copy(aggregateFunction = Count(attr),
    -                    resultId = NamedExpression.newExprId), attr.name + "_count")()))
    -            case Average(cast@MatchCastExpression(attr: AttributeReference, _)) =>
    -              Seq((attr.name + "_sum", Alias(attrExpression.
    -                copy(aggregateFunction = Sum(cast),
    -                  resultId = NamedExpression.newExprId),
    -                attr.name + "_sum")()),
    -                (attr.name, Alias(attrExpression.
    -                  copy(aggregateFunction = Count(cast), resultId =
    -                    NamedExpression.newExprId), attr.name + "_count")()))
    -            case _ => Seq(("", alias))
    -          }
    -
    -      }
    -    }
    -
       /**
        * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not.
        * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is
        * valid.
    -   *
        * @param namedExpression
    -   * @return
    +   * named expressions
    --- End diff --
    
    move it up. comment should like
    `@param namedExpression named expressions`


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159057341
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -282,6 +136,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             val listFilterColumn = list
               .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
               .toList
    +        val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
             // getting all the aggregation columns
             val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    --- End diff --
    
    This code is unused now , please remove all code related to it.


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159459370
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        val isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, None) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                None,
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              newChild)
    +          } else {
    +            agg
               }
    -          carbonTable
    -        // case for handling aggregation with order by and filter when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +        } else {
    +          agg
    +        }
    +      // case of handling aggregation query with filter
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      Filter(expression, child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        var isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        // getting the columns from filter expression
    +        isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnFromFilterExp(expression, list, carbonTable)
    +        }
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    -          }
    -          carbonTable
    -        case _ =>
    -          isValidPlan = false
    -          null
    -      }
    -      if (isValidPlan && null != carbonTable) {
    -        isValidPlan = isSpecificSegmentPresent(carbonTable)
    -      }
    -      // if plan is valid then update the plan with child attributes
    -      if (isValidPlan) {
    -        // getting all the projection columns
    -        val listProjectionColumn = list
    -          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
    -          .toList
    -        // getting all the filter columns
    -        val listFilterColumn = list
    -          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
    -          .toList
    -        // getting all the aggregation columns
    -        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    -          .toList
    -        // create a query plan object which will be used to select the list of pre aggregate tables
    -        // matches with this plan
    -        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    -          listAggregationColumn.asJava,
    -          listFilterColumn.asJava)
    -        // create aggregate table selector object
    -        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    -        // select the list of valid child tables
    -        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    -        // if it does not match with any pre aggregate table return the same plan
    -        if (!selectedDataMapSchemas.isEmpty) {
    -          // filter the selected child schema based on size to select the pre-aggregate tables
    -          // that are nonEmpty
    -          val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    -          val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
    -            val identifier = TableIdentifier(
    -              selectedDataMapSchema.getRelationIdentifier.getTableName,
    -              Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
    -            val carbonRelation =
    -              catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
    -            val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
    -            (selectedDataMapSchema, carbonRelation, relation)
    -          }.filter(_._2.sizeInBytes != 0L)
    -          if (relationBuffer.isEmpty) {
    -            // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
    -            // tables have date yet.
    -            // In this case we would return the original plan so that the query hits the parent
    -            // table.
    -            plan
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                Some(expression),
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              Filter(updatedFilterExpression.get,
    +                newChild))
               } else {
    -            // If the relationBuffer is nonEmpty then find the table with the minimum size.
    -            val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
    -            val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
    -            // transform the query plan based on selected child schema
    -            transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
    +            agg
               }
             } else {
    -          plan
    +          agg
             }
    +    }
    +    updatedPlan
    +  }
    +
    +  /**
    +   * Below method will be used to validate query plan and get the proper aggregation data map schema
    +   * and child relation plan object if plan is valid for transformation
    +   * @param queryColumns
    +   * list of query columns from projection and filter
    +   * @param aggregateExpressions
    +   * list of aggregate expression (aggregate function)
    +   * @param carbonTable
    +   * parent carbon table
    +   * @param logicalRelation
    +   * parent logical relation
    +   * @return if plan is valid then aggregation data map schema and its relation plan
    +   */
    +  def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn],
    +      aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): (AggregationDataMapSchema, LogicalPlan) = {
    +    // getting all the projection columns
    +    val listProjectionColumn = queryColumns
    +      .filter(queryColumn => !queryColumn.isFilterColumn)
    +      .toList
    +    // getting all the filter columns
    +    val listFilterColumn = queryColumns
    +      .filter(queryColumn => queryColumn.isFilterColumn)
    +      .toList
    +    val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
    +    // create a query plan object which will be used to select the list of pre aggregate tables
    +    // matches with this plan
    +    val queryPlan = new QueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava)
    +    // create aggregate table selector object
    +    val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    +    // select the list of valid child tables
    +    val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +    // query has only aggregate expression then selected data map will be empty
    +    // the validate all the child data map otherwise validate selected data map
    +    var selectedAggMaps = if (isProjectionColumnPresent) {
    +      selectedDataMapSchemas
    +    } else {
    +      carbonTable.getTableInfo.getDataMapSchemaList
    +    }
    +    val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
    +      PreAggregateUtil.getLogicalPlanFromAggExp(queryAggExp,
    +        carbonTable.getTableName,
    +        carbonTable.getDatabaseName,
    +        logicalRelation,
    +        sparkSession,
    +        parser)
    +    }.toSeq
    +    // if query does not have any aggregate function no need to validate the same
    +    if (aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
    +      selectedAggMaps = validateAggregateExpression(selectedAggMaps.asScala.toSeq,
    +        carbonTable,
    +        logicalRelation,
    +        aggExpLogicalPlans).asJava
    +    }
    +    // if it does not match with any pre aggregate table return the same plan
    +    if (!selectedAggMaps.isEmpty) {
    +      // filter the selected child schema based on size to select the pre-aggregate tables
    +      // that are nonEmpty
    +      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +      val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema =>
    +        val identifier = TableIdentifier(
    +          selectedDataMapSchema.getRelationIdentifier.getTableName,
    +          Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
    +        val carbonRelation =
    +          catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
    +        val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
    +        (selectedDataMapSchema, carbonRelation, relation)
    +      }.filter(_._2.sizeInBytes != 0L)
    +      if (relationBuffer.isEmpty) {
    +        // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
    +        // tables have date yet.
    +        // In this case we would return the original plan so that the query hits the parent
    +        // table.
    +        (null, null)
           } else {
    -        plan
    +        // If the relationBuffer is nonEmpty then find the table with the minimum size.
    +        val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
    +        val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
    +        (aggDataMapSchema.asInstanceOf[AggregationDataMapSchema], newRelation)
           }
    +    } else {
    +      (null, null)
         }
       }
     
    +  /**
    +   * Below method will be used to validate aggregate expression with the data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val mappingModel = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => mappingModel.exists(m => p.sameResult(m.logicalPlan))
    --- End diff --
    
    move p => to up


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2568/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2738/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1343/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159440457
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    --- End diff --
    
    not required to assign to variable


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159443443
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    --- End diff --
    
    MOve down `grExp` to next line


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2364/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159435962
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -18,28 +18,46 @@
     package org.apache.spark.sql.hive
     
     import scala.collection.JavaConverters._
    -import scala.collection.mutable.ArrayBuffer
    +import scala.collection.mutable
     
    -import org.apache.spark.SPARK_VERSION
     import org.apache.spark.sql._
    -import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression}
     import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression}
     import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
    -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
    -import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
    +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
     import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
     import org.apache.spark.sql.types._
    -import org.apache.spark.sql.util.CarbonException
     import org.apache.spark.util.CarbonReflectionUtils
     
     import org.apache.carbondata.core.constants.CarbonCommonConstants
     import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
     import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
     import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
    -import org.apache.carbondata.spark.util.CarbonScalaUtil
     
    +/**
    + * model class to store aggregate expression logical plan
    + * and its column schema mapping
    + * @param logicalPlan
    + * logical plan of aggregate expression
    + * @param columnSchema
    + * column schema from table
    + */
    +case class AggExpToColumnMappingModel(var logicalPlan: LogicalPlan,
    --- End diff --
    
    Move `var logicalPlan: LogicalPlan` to next line


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2575/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159440196
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    --- End diff --
    
    Better use match { case } instead of if else


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2578/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159052141
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    +      // to handle filter expression
    +      case filter@Filter(filterExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        // getting the columns from filter expression
    +        if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
    +          isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable)
    +        }
    +        filter
    +        // to handle aggregate expression
    +      case agg@Aggregate(groupingExp,
    +      aggregateExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        isValidPlan = extractQueryColumnsFromAggExpression(
    +          groupingExp,
    +          aggregateExp,
    +          pTable,
    +          list,
    +          qAggExprs)
    +        agg
    +        // to handle aggregate expression with filter
    +      case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
    +        val out = validatePlanAndGetFields(filter, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs)
    +        }
    +        agg
    +        // to handle projection with order by
    +      case proj@Project(projectList, sort@Sort(_, _, _)) =>
    +        val out = validatePlanAndGetFields(sort, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +        // to handle only projection
    +      case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +      // case for handling aggregation with order by when only projection column exits
    +      case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++
    +          extractQueryColumnForOrderBy(None, sortOrders, pTable)
    +        }
    +        sort
    +    }
    +    (isValidPlan, pTable, qLRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to validate aggregate expression with the data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
    +      }
    +    }
    +    val selectedDataMapSchema = selectedDataMap.collect {
    +      case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
    +        dataMap
    +    }
    +    selectedDataMapSchema
    +  }
    +
    +  /**
    +   * Below method will be used to update the logical plan of expression
    +   * with parent table logical relation
    +   * @param logicalPlan
    +   * @param logicalRelation
    +   * @return
    +   */
    +  def updateLogicalRelation(logicalPlan: LogicalPlan,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    logicalPlan transform {
    +      case l: LogicalRelation =>
    +        l.copy(relation = logicalRelation.relation)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to to get the logical plan for each aggregate expression in
    +   * child data map and its column schema mapping if mapping is already present
    +   * then it will use the same otherwise it will generate and stored in aggregation data map
    +   * @param selectedDataMap
    +   *                        child data map
    +   * @param carbonTable
    +   *                    parent table
    +   * @param logicalRelation
    +   *                        logical relation of actual plan
    +   * @return map of logical plan for each aggregate expression in child query and its column mapping
    +   */
    +  def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
    +    val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
    +    // if column mapping is not present
    +    if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
    +      // add preAGG UDF to avoid all the PreAggregate rule
    +      val childDataMapQueryString = new CarbonSpark2SqlParser()
    +        .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
    +      // get the logical plan
    +      val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
    +      // getting all aggregate expression from query
    +      val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
    +      // in case of average child table will have two columns which will be stored in sequence
    +      // so for average expression we need to get two columns for mapping
    +      var counter = 0
    +      // sorting the columns based on schema ordinal so search will give proper result
    +      val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
    +        .sortBy(_.getSchemaOrdinal)
    +      val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
    +        // for each aggregate expression get logical plan
    +        val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
    +          carbonTable.getTableName,
    +          carbonTable.getDatabaseName, logicalRelation)
    +        // check if aggregate expression is of type avg
    +        // get the columns
    +        var columnSchema = aggDataMapSchema
    +          .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
    +        // increment the counter so when for next expression above code will be
    +        // executed it will search from that schema ordinal
    +        counter = columnSchema.getSchemaOrdinal + 1
    +        (expLogicalPlan, columnSchema)
    +      }.toMap
    +      // store the mapping in data map
    +      aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
    +      // return the mapping
    +      logicalPlanToColumnMapping
    +    } else {
    +      // if already present in data map then return the same
    +      aggDataMapSchema.getAggregateExpressionToColumnMapping
    +        .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap
    +    }
    +  }
    +
    +
    +  /**
    +   * Below method will be used to get the logical plan from aggregate expression
    +   * @param aggExp
    +   *               aggregate expression
    +   * @param tableName
    +   *                  parent table name
    +   * @param databaseName
    +   *                     database name
    +   * @param logicalRelation
    +   *                        logical relation
    +   * @return logical plan
    +   */
    +  def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
    +      tableName: String,
    +      databaseName: String,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
    +    // be applied
    +    val query = new CarbonSpark2SqlParser()
    +      .addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName")
    +    // updating the logical relation of logical plan to so when two logical plan
    +    // will be compared it will not consider relation
    +    updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to get aggregate expression
    +   * @param logicalPlan
    +   *               logical plan
    +   * @return list of aggregate expression
    +   */
    +  def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = {
    +    val list = scala.collection.mutable.HashSet.empty[AggregateExpression]
    --- End diff --
    
    Should use List or LinkedHasSet as the order of insertion is required for mapping


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/1728


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1152/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159443569
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    --- End diff --
    
    Indentation is wrong


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2421/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2625/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159438073
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    --- End diff --
    
    Better do the transform instead of match as in case if Alias comes match cannot work


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2733/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159438250
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    --- End diff --
    
    Better use `match { case }` instead of if else


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2566/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2735/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159457280
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    --- End diff --
    
    childExp.qualifier will not have table alias name in case of join we need to table alias name 


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r158934363
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -126,16 +127,17 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                 aggregateExp,
                 carbonTable,
                 tableName,
    -            list)
    -          carbonTable
    +            list,
    +            aggregateExpressions)
    +          (carbonTable, logicalRelation)
     
             // below case for handling filter query
             // When plan has grouping expression, aggregate expression
             // filter expression
             case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    +        aggregateExp,
    --- End diff --
    
    unnecessary change


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159457864
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -806,67 +824,105 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         val updatedAggExp = aggregateExpressions.map {
    --- End diff --
    
    It seems lot of duplicate code, better do transform and use single  `case attr: AttributeReference` handle all here


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1338/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1283/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2451/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2674/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2562/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2508/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159058689
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -294,12 +149,30 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
             // select the list of valid child tables
             val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +        // query has only aggregate expression then selected data map will be empty
    +        // the validate all the child data map otherwise validate selected data map
    +        var selectedAggMaps = if (isProjectionColumnPresent) {
    +          selectedDataMapSchemas
    +        } else {
    +          carbonTable.getTableInfo.getDataMapSchemaList
    +        }
    +        val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
    +          getLogicalPlanFromAggExp(queryAggExp,
    +            carbonTable.getTableName,
    +            carbonTable.getDatabaseName, logicalRelation)
    +        }.toSeq
    +        if(aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
    --- End diff --
    
    add comments


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    retest this please


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1197/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159056348
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    --- End diff --
    
    It seems Join condition of two plans is not handled here. Please handle Join, Union cases also using recursion.


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2612/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    retest this please


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1377/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    LGTM


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2722/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159053493
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    +      // to handle filter expression
    +      case filter@Filter(filterExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        // getting the columns from filter expression
    +        if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
    +          isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable)
    +        }
    +        filter
    +        // to handle aggregate expression
    +      case agg@Aggregate(groupingExp,
    +      aggregateExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        isValidPlan = extractQueryColumnsFromAggExpression(
    +          groupingExp,
    +          aggregateExp,
    +          pTable,
    +          list,
    +          qAggExprs)
    +        agg
    +        // to handle aggregate expression with filter
    +      case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
    +        val out = validatePlanAndGetFields(filter, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs)
    +        }
    +        agg
    +        // to handle projection with order by
    +      case proj@Project(projectList, sort@Sort(_, _, _)) =>
    +        val out = validatePlanAndGetFields(sort, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +        // to handle only projection
    +      case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +      // case for handling aggregation with order by when only projection column exits
    +      case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++
    +          extractQueryColumnForOrderBy(None, sortOrders, pTable)
    +        }
    +        sort
    +    }
    +    (isValidPlan, pTable, qLRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to validate aggregate expression with the data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
    +      }
    +    }
    +    val selectedDataMapSchema = selectedDataMap.collect {
    +      case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
    +        dataMap
    +    }
    +    selectedDataMapSchema
    +  }
    +
    +  /**
    +   * Below method will be used to update the logical plan of expression
    +   * with parent table logical relation
    +   * @param logicalPlan
    +   * @param logicalRelation
    +   * @return
    +   */
    +  def updateLogicalRelation(logicalPlan: LogicalPlan,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    logicalPlan transform {
    +      case l: LogicalRelation =>
    +        l.copy(relation = logicalRelation.relation)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to to get the logical plan for each aggregate expression in
    +   * child data map and its column schema mapping if mapping is already present
    +   * then it will use the same otherwise it will generate and stored in aggregation data map
    +   * @param selectedDataMap
    +   *                        child data map
    +   * @param carbonTable
    +   *                    parent table
    +   * @param logicalRelation
    +   *                        logical relation of actual plan
    +   * @return map of logical plan for each aggregate expression in child query and its column mapping
    +   */
    +  def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
    +    val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
    +    // if column mapping is not present
    +    if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
    +      // add preAGG UDF to avoid all the PreAggregate rule
    +      val childDataMapQueryString = new CarbonSpark2SqlParser()
    +        .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
    +      // get the logical plan
    +      val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
    +      // getting all aggregate expression from query
    +      val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
    +      // in case of average child table will have two columns which will be stored in sequence
    +      // so for average expression we need to get two columns for mapping
    +      var counter = 0
    +      // sorting the columns based on schema ordinal so search will give proper result
    +      val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
    +        .sortBy(_.getSchemaOrdinal)
    +      val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
    +        // for each aggregate expression get logical plan
    +        val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
    +          carbonTable.getTableName,
    +          carbonTable.getDatabaseName, logicalRelation)
    +        // check if aggregate expression is of type avg
    +        // get the columns
    +        var columnSchema = aggDataMapSchema
    +          .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
    +        // increment the counter so when for next expression above code will be
    +        // executed it will search from that schema ordinal
    +        counter = columnSchema.getSchemaOrdinal + 1
    +        (expLogicalPlan, columnSchema)
    +      }.toMap
    +      // store the mapping in data map
    +      aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
    +      // return the mapping
    +      logicalPlanToColumnMapping
    +    } else {
    +      // if already present in data map then return the same
    +      aggDataMapSchema.getAggregateExpressionToColumnMapping
    +        .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap
    +    }
    +  }
    +
    +
    +  /**
    +   * Below method will be used to get the logical plan from aggregate expression
    +   * @param aggExp
    +   *               aggregate expression
    +   * @param tableName
    +   *                  parent table name
    +   * @param databaseName
    +   *                     database name
    +   * @param logicalRelation
    +   *                        logical relation
    +   * @return logical plan
    +   */
    +  def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
    +      tableName: String,
    +      databaseName: String,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
    +    // be applied
    +    val query = new CarbonSpark2SqlParser()
    --- End diff --
    
    Don't create parser every time, pass from caller


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159458812
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        val isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, None) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                None,
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              newChild)
    +          } else {
    +            agg
               }
    -          carbonTable
    -        // case for handling aggregation with order by and filter when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +        } else {
    +          agg
    +        }
    +      // case of handling aggregation query with filter
    +      case agg@Aggregate(grExp,
    --- End diff --
    
    How about with Sort ?


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159378009
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,105 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    +  test("test pre agg table selection with expression 1") {
    +    val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
    +  }
    +
    +
    +  test("test pre agg table selection with expression 2") {
    +    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
    +  }
    +
    +  test("test pre agg table selection with expression 3") {
    +    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
    +    checkAnswer(df, Seq(Row(6.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 4") {
    +    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
    +    checkAnswer(df, Seq(Row(2.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 5") {
    +    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
    +    checkAnswer(df, Seq(Row(2.0,6.0)))
    +  }
    +
    --- End diff --
    
    add TestPreAggregateWithSubQuery


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159438754
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    --- End diff --
    
    why you use `attr.qualifier `  intead of `childExp.qualifier`


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159442284
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    --- End diff --
    
    use transform inside map , then you no need to handle Alias separetly.


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1202/



---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159058395
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -933,121 +1143,122 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        * child data map schema
        * @param attributes
        * child logical relation
    +   * @param expLogicalPlanToColumnSchemaMapping
    +   * expression logical plan to data map column mapping
    +   * @param parentTable
    +   * parent carbon table
    +   * @param logicalRelation
    +   * logical relation
        * @return updated expression
        */
       def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
    -      dataMapSchema: DataMapSchema,
    -      attributes: Seq[AttributeReference]):
    +      dataMapSchema: AggregationDataMapSchema,
    +      attributes: Seq[AttributeReference],
    +      expLogicalPlanToColumnSchemaMapping: Option[Map[LogicalPlan, ColumnSchema]],
    +      parentTable: CarbonTable,
    +      logicalRelation: LogicalRelation):
       Expression = {
    +    val updatedAggExp = getUpdateAggregateExpressions(aggExp)
    --- End diff --
    
    Add code comment here


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159443128
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    --- End diff --
    
    Here also you use updateExpression method, no need of this method


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2734/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2585/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1219/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1349/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Retest this please 


---

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r158934260
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,105 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    +  test("test pre agg table selection with expression 1") {
    +    val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
    +  }
    +
    +
    +  test("test pre agg table selection with expression 2") {
    +    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
    +  }
    +
    +  test("test pre agg table selection with expression 3") {
    +    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
    +    checkAnswer(df, Seq(Row(6.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 4") {
    +    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
    +    checkAnswer(df, Seq(Row(2.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 5") {
    +    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
    +    checkAnswer(df, Seq(Row(2.0,6.0)))
    +  }
    +
    --- End diff --
    
    add a testcase for subquery also


---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2732/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1326/



---

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

Posted by sraghunandan <gi...@git.apache.org>.
Github user sraghunandan commented on the issue:

    https://github.com/apache/carbondata/pull/1728
  
    retest sdv please


---