You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kumarvishal09 <gi...@git.apache.org> on 2017/11/03 13:19:48 UTC

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

GitHub user kumarvishal09 opened a pull request:

    https://github.com/apache/carbondata/pull/1464

    [WIP][CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes

    Add the API in carbon layer to get suitable aggregation table for group by query. Update query plan in carbon optimizer to support aggregation tables for group by queries.
    
    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed: None
     
     - [ ] Any backward compatibility impacted: None
     
     - [ ] Document update required- Yes
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kumarvishal09/incubator-carbondata pre-aggregate_Query

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1464
    
----
commit 4cc40cf6cd68f4e4c1e6cb613b3e28c5bf6024e0
Author: kumarvishal <ku...@gmail.com>
Date:   2017-10-30T07:14:32Z

    aggregation selector

----


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149003605
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala ---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with count)).
    + * Note: During aggregate table creation for average table will be created with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present is in plan
    +      // then call is from create preaggregate table class so no need to transform the query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for plan to be resolved
    +      // return the same plan as we can tranform the plan only when everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +
    +        // below case for handling filter query
    +        // When plan has grouping expression, aggregate expression
    +        // filter expression
    +        case Aggregate(groupingExp, aggregateExp,
    +        Filter(filterExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    --- End diff --
    
    Already added this check in case statement , no need to add this check again


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148997835
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -49,70 +50,109 @@ object PreAggregateUtil {
     
       def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
         plan match {
    -      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
    -        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    -        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, logicalRelation: LogicalRelation)
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
           case _ => throw new MalformedCarbonCommandException("table does not exist")
    --- End diff --
    
    It is not actually table does not exist. it is the plan doesn't match


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1557/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1678/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/991/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149001761
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -49,70 +50,109 @@ object PreAggregateUtil {
     
       def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
         plan match {
    -      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
    -        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    -        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, logicalRelation: LogicalRelation)
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
           case _ => throw new MalformedCarbonCommandException("table does not exist")
         }
       }
     
       /**
        * Below method will be used to validate the select plan
        * and get the required fields from select plan
    -   * Currently only aggregate query is support any other type of query will
    -   * fail
    +   * Currently only aggregate query is support any other type of query will fail
    +   *
        * @param plan
        * @param selectStmt
        * @return list of fields
        */
       def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
           selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
    -    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
         plan match {
    -      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
    -        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    -        val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
    -          .metaData.carbonTable
    -        val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
    -          .getTableName
    -        val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
    -          .getDatabaseName
    -        val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
    -          .getTableId
    -        if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
    +      case Aggregate(groupByExp, aggExp, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) =>
    +        getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
    +      case Aggregate(groupByExp, aggExp, logicalRelation: LogicalRelation) =>
    +        getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
    +      case _ =>
    +        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to get the fields from expressions
    +   * @param groupByExp
    +   *                  grouping expression
    +   * @param aggExp
    +   *               aggregate expression
    +   * @param logicalRelation
    +   *                        logical relation
    +   * @param selectStmt
    +   *                   select statement
    +   * @return fields from expressions
    +   */
    +  def getFieldsFromPlan(groupByExp: Seq[Expression],
    +      aggExp: Seq[NamedExpression], logicalRelation: LogicalRelation, selectStmt: String):
    +  scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
    +    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
    +    if (!logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
    +      throw new MalformedCarbonCommandException("Un-supported table")
    +    }
    +    val carbonTable = logicalRelation.relation.
    +      asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
    +      .metaData.carbonTable
    +    val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
    +      .getTableName
    +    val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
    +      .getDatabaseName
    +    val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
    +      .getTableId
    +    if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
    +      throw new MalformedCarbonCommandException(
    +        "Pre Aggregation is not supported on Pre-Aggregated Table")
    +    }
    +    groupByExp.map {
    +      case attr: AttributeReference =>
    +        fieldToDataMapFieldMap += getField(attr.name,
    +          attr.dataType,
    +          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    +          parentTableName = parentTableName,
    +          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
    +      case _ =>
    +        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
    --- End diff --
    
    it is unsupported function exception


---

[GitHub] carbondata pull request #1464: [CARBONDATA-1523]Pre Aggregate table selectio...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r150148019
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -49,70 +50,109 @@ object PreAggregateUtil {
     
       def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
         plan match {
    -      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
    -        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    -        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, logicalRelation: LogicalRelation)
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
           case _ => throw new MalformedCarbonCommandException("table does not exist")
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149004253
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala ---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with count)).
    + * Note: During aggregate table creation for average table will be created with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present is in plan
    +      // then call is from create preaggregate table class so no need to transform the query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for plan to be resolved
    +      // return the same plan as we can tranform the plan only when everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +
    +        // below case for handling filter query
    +        // When plan has grouping expression, aggregate expression
    +        // filter expression
    +        case Aggregate(groupingExp, aggregateExp,
    +        Filter(filterExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +            // getting the columns from filter expression
    +            filterExp.transform {
    +              case attr: AttributeReference =>
    +                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
    +                attr
    +            }
    +          }
    +          carbonTable
    +
    +        // When plan has grouping expression, aggregate expression
    +        // logical relation
    +        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +        case _ =>
    +          isValidPlan = false
    +          null
    +      }
    +      // if plan is valid then update the plan with child attributes
    +      if (isValidPlan) {
    +        // getting all the projection columns
    +        val listProjectionColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
    +        // getting all the filter columns
    +        val listFilterColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
    +        // getting all the aggregation columns
    +        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    +        // create a query plan object which will be used to select the list of pre aggregate tables
    +        // matches with this plan
    +        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    +          listAggregationColumn.asJava,
    +          listFilterColumn.asJava)
    +        // create aggregate table selector object
    +        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    +        // select the list of valid child tables
    +        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +        // if it doesnot match with any pre aggregate table return the same plan
    +        if (!selectedDataMapSchemas.isEmpty) {
    +          // sort the selected child schema based on size to select smallest pre aggregate table
    +          val (aggDataMapSchema, carbonRelation) =
    +            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
    +              val catalog = sparkSession.sessionState.catalog
    +              val carbonRelation = catalog
    +                .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
    +                  .getTableName,
    +                  Some(selectedDataMapSchema.getRelationIdentifier
    +                    .getDatabaseName))).asInstanceOf[SubqueryAlias].child
    +                .asInstanceOf[LogicalRelation]
    +              (selectedDataMapSchema, carbonRelation)
    +            }.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
    +              .head
    --- End diff --
    
    Instead of using `}.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes).head`
    use `.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)`


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148995569
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -116,6 +218,83 @@ public void setProperties(Map<String, String> properties) {
           String value = in.readUTF();
           this.properties.put(key, value);
         }
    +  }
     
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    +   * @param columnName
    +   *                parent column name
    +   * @return child column schema
    +   */
    +  public ColumnSchema getChildColBasedByParentForNonAggF(String columnName) {
    +    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
    +    if (null != columnSchemas) {
    +      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
    +      while (iterator.hasNext()) {
    +        ColumnSchema next = iterator.next();
    +        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
    +          return next;
    +        }
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the column schema based on parent column name
    +   * @param columName
    +   *                parent column name
    +   * @return child column schmea
    +   */
    +  public ColumnSchema getChildColumnByParentName(String columName) {
    --- End diff --
    
    better name as `getChildColumnByParentColName'


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148995547
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -116,6 +218,83 @@ public void setProperties(Map<String, String> properties) {
           String value = in.readUTF();
           this.properties.put(key, value);
         }
    +  }
     
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    +   * @param columnName
    +   *                parent column name
    +   * @return child column schema
    +   */
    +  public ColumnSchema getChildColBasedByParentForNonAggF(String columnName) {
    --- End diff --
    
    Better name as `getNonAggChildColumnByParentColName`


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    retest this please


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1002/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    retest this please



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148992553
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -67,6 +92,83 @@ public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
     
       public void setChildSchema(TableSchema childSchema) {
         this.childSchema = childSchema;
    +    List<ColumnSchema> listOfColumns = this.childSchema.getListOfColumns();
    +    fillNonAggFunctionColumns(listOfColumns);
    +    fillAggFunctionColumns(listOfColumns);
    +    fillParentNameToAggregationMapping(listOfColumns);
    +  }
    +
    +  /**
    +   * Method to prepare mapping of parent to list of aggregation function applied on that column
    +   * @param listOfColumns
    +   *        child column schema list
    +   */
    +  private void fillParentNameToAggregationMapping(List<ColumnSchema> listOfColumns) {
    +    parentColumnToAggregationsMapping = new HashMap<>();
    +    for (ColumnSchema column : listOfColumns) {
    +      if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
    +        List<ParentColumnTableRelation> parentColumnTableRelations =
    +            column.getParentColumnTableRelations();
    +        if (null != parentColumnTableRelations) {
    --- End diff --
    
    Please check the size of this list as well or iterate the list instead of always getting from 0 element.


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149002531
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -16,17 +16,18 @@
      */
     package org.apache.spark.sql.execution.command.preaaggregate
     
    -import scala.collection.mutable.ListBuffer
    +import scala.collection.mutable.{ArrayBuffer, ListBuffer}
    --- End diff --
    
    what is the use of method `prepareSchemaJson`


---

[GitHub] carbondata pull request #1464: [CARBONDATA-1523]Pre Aggregate table selectio...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r150147994
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -16,17 +16,18 @@
      */
     package org.apache.spark.sql.execution.command.preaaggregate
     
    -import scala.collection.mutable.ListBuffer
    +import scala.collection.mutable.{ArrayBuffer, ListBuffer}
    --- End diff --
    
    removed


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1662/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1619/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148995599
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -116,6 +218,83 @@ public void setProperties(Map<String, String> properties) {
           String value = in.readUTF();
           this.properties.put(key, value);
         }
    +  }
     
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    +   * @param columnName
    +   *                parent column name
    +   * @return child column schema
    +   */
    +  public ColumnSchema getChildColBasedByParentForNonAggF(String columnName) {
    +    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
    +    if (null != columnSchemas) {
    +      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
    +      while (iterator.hasNext()) {
    +        ColumnSchema next = iterator.next();
    +        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
    +          return next;
    +        }
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the column schema based on parent column name
    +   * @param columName
    +   *                parent column name
    +   * @return child column schmea
    +   */
    +  public ColumnSchema getChildColumnByParentName(String columName) {
    +    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
    +    for (ColumnSchema columnSchema : listOfColumns) {
    +      List<ParentColumnTableRelation> parentColumnTableRelations =
    +          columnSchema.getParentColumnTableRelations();
    +      if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) {
    +        return columnSchema;
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the child column schema based on parent name and aggregate
    +   * function applied on column
    +   * @param columnName
    +   *                  parent column name
    +   * @param aggFunction
    +   *                  aggregate function applied
    +   * @return child column schema
    +   */
    +  public ColumnSchema getChildColByParentWithAggFun(String columnName,
    --- End diff --
    
    Better name as `getAggChildColumnByParentColName`


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1075/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148997655
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
    @@ -61,7 +66,7 @@ case class CreatePreAggregateTableCommand(
         val dbName = cm.databaseName
         LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
         // getting the parent table
    -    val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
    +    val parentTable = PreAggregateUtil.getParentCarbonTable(logicalPlan)
    --- End diff --
    
    most of the content of class same as `CreateTableCommand`, so better call that command from here


---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148992649
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -67,6 +92,83 @@ public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
     
       public void setChildSchema(TableSchema childSchema) {
         this.childSchema = childSchema;
    +    List<ColumnSchema> listOfColumns = this.childSchema.getListOfColumns();
    +    fillNonAggFunctionColumns(listOfColumns);
    +    fillAggFunctionColumns(listOfColumns);
    +    fillParentNameToAggregationMapping(listOfColumns);
    --- End diff --
    
    I feel all the above 3 functions doing the almost same job, why don't you combine all of them.


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1560/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1042/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1077/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1680/



---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1485/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148997142
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---
    @@ -51,6 +51,7 @@ class CarbonEnv {
     
       def init(sparkSession: SparkSession): Unit = {
         sparkSession.udf.register("getTupleId", () => "")
    +    sparkSession.udf.register("preAgg", () => "")
    --- End diff --
    
    add comment for usage


---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/859/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148996693
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.preagg;
    +
    +import java.util.List;
    +
    +/**
    + * class to maintain the query plan to select the data map tables
    + */
    +public class QueryPlan {
    +
    +  /**
    +   * List of projection columns
    +   */
    +  private List<QueryColumn> projectionColumn;
    +
    +  /**
    +   * list of aggregation columns
    +   */
    +  private List<QueryColumn> aggregationColumns;
    --- End diff --
    
    I think It is not required to separate out `aggregationColumns` , all should be part of `projectionColumn` . Just add one method `hasAggFunc` to `QueryColumn`


---

[GitHub] carbondata pull request #1464: [CARBONDATA-1523]Pre Aggregate table selectio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r150454937
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata.schema.table;
    +
    +import java.util.Map;
    +
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA;
    +
    +public class DataMapSchemaFactory {
    +  public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory();
    +
    +  /**
    +   * Below class will be used to get data map schema object
    +   * based on class name
    +   * @param className
    +   * @return data map schema
    +   */
    +  public DataMapSchema getDataMapSchema(String className, Map<String, String> properties,
    +      TableSchema tableSchema, RelationIdentifier relationIdentifier) {
    +    switch (className) {
    +      case AGGREGATIONDATAMAPSCHEMA:
    +        return new AggregationDataMapSchema(className, properties, tableSchema, relationIdentifier);
    +      default:
    +        throw new IllegalArgumentException("Invalid Class Name");
    --- End diff --
    
    Don't throw exception, return new DataMapSchema by default


---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1435/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    LGTM


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1044/



---

[GitHub] carbondata pull request #1464: [CARBONDATA-1523]Pre Aggregate table selectio...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/1464


---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/802/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148998207
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -49,70 +50,109 @@ object PreAggregateUtil {
     
       def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
         plan match {
    -      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
    -        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    -        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
    +      case Aggregate(_, _, logicalRelation: LogicalRelation)
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
    +          carbonRelation.metaData.carbonTable
           case _ => throw new MalformedCarbonCommandException("table does not exist")
         }
       }
     
       /**
        * Below method will be used to validate the select plan
        * and get the required fields from select plan
    -   * Currently only aggregate query is support any other type of query will
    -   * fail
    +   * Currently only aggregate query is support any other type of query will fail
    +   *
        * @param plan
        * @param selectStmt
        * @return list of fields
        */
       def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
    --- End diff --
    
    typo `Attributes`


---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1434/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1562/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149012346
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -21,7 +21,14 @@
     import java.io.IOException;
     import java.io.Serializable;
     import java.util.HashMap;
    +import java.util.HashSet;
    --- End diff --
    
    There is already a class name `DataMapSchema.java` so better change the older class name to `CarbonRowSchemq`


---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/800/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148991769
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -124,6 +124,8 @@
        */
       private int numberOfNoDictSortColumns;
     
    +  private boolean hasPreAggDataMap;
    --- End diff --
    
    Better name it as 'hasChildDataMap'


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/947/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149004422
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala ---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with count)).
    + * Note: During aggregate table creation for average table will be created with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present is in plan
    +      // then call is from create preaggregate table class so no need to transform the query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for plan to be resolved
    +      // return the same plan as we can tranform the plan only when everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +
    +        // below case for handling filter query
    +        // When plan has grouping expression, aggregate expression
    +        // filter expression
    +        case Aggregate(groupingExp, aggregateExp,
    +        Filter(filterExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    --- End diff --
    
    put && on the above line. Please do this for all case statements


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1660/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149005119
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala ---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with count)).
    + * Note: During aggregate table creation for average table will be created with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present is in plan
    +      // then call is from create preaggregate table class so no need to transform the query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for plan to be resolved
    +      // return the same plan as we can tranform the plan only when everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +
    +        // below case for handling filter query
    +        // When plan has grouping expression, aggregate expression
    +        // filter expression
    +        case Aggregate(groupingExp, aggregateExp,
    +        Filter(filterExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +            // getting the columns from filter expression
    +            filterExp.transform {
    +              case attr: AttributeReference =>
    +                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
    +                attr
    +            }
    +          }
    +          carbonTable
    +
    +        // When plan has grouping expression, aggregate expression
    +        // logical relation
    +        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +        case _ =>
    +          isValidPlan = false
    +          null
    +      }
    +      // if plan is valid then update the plan with child attributes
    +      if (isValidPlan) {
    +        // getting all the projection columns
    +        val listProjectionColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
    +        // getting all the filter columns
    +        val listFilterColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
    +        // getting all the aggregation columns
    +        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    +        // create a query plan object which will be used to select the list of pre aggregate tables
    +        // matches with this plan
    +        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    +          listAggregationColumn.asJava,
    +          listFilterColumn.asJava)
    +        // create aggregate table selector object
    +        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    +        // select the list of valid child tables
    +        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +        // if it doesnot match with any pre aggregate table return the same plan
    +        if (!selectedDataMapSchemas.isEmpty) {
    +          // sort the selected child schema based on size to select smallest pre aggregate table
    +          val (aggDataMapSchema, carbonRelation) =
    +            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
    +              val catalog = sparkSession.sessionState.catalog
    +              val carbonRelation = catalog
    +                .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
    +                  .getTableName,
    +                  Some(selectedDataMapSchema.getRelationIdentifier
    +                    .getDatabaseName))).asInstanceOf[SubqueryAlias].child
    +                .asInstanceOf[LogicalRelation]
    +              (selectedDataMapSchema, carbonRelation)
    +            }.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
    +              .head
    +          // transform the query plan based on selected child schema
    +          transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation)
    +        } else {
    +          plan
    +        }
    +      } else {
    +        plan
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to get the child attribute reference
    +   * based on parent name
    +   *
    +   * @param dataMapSchema
    +   * child schema
    +   * @param attributeReference
    +   * parent attribute reference
    +   * @param childCarbonRelation
    +   * child logical relation
    +   * @param aggFunction
    +   * aggregation function applied on child
    +   * @return child attribute reference
    +   */
    +  def getChildAttributeReference(dataMapSchema: DataMapSchema,
    +      attributeReference: AttributeReference,
    +      childCarbonRelation: LogicalRelation,
    +      aggFunction: String = ""): AttributeReference = {
    +    val columnSchema = if (aggFunction.isEmpty) {
    +      dataMapSchema.getChildColumnByParentName(attributeReference.name)
    +    } else {
    +      dataMapSchema.getChildColByParentWithAggFun(attributeReference.name, aggFunction)
    +    }
    +    // here column schema cannot be null, if it is null then aggregate table selection
    +    // logic has some problem
    +    if (null == columnSchema) {
    +      throw new AnalysisException("Column doesnot exists in Pre Aggregate table")
    +    }
    +    // finding the child attribute from child logical relation
    +    childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
    +  }
    +
    +  /**
    +   * Below method will be used to transform the main table plan to child table plan
    +   * rules for transformming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @param aggDataMapSchema
    +   * select data map schema
    +   * @param childCarbonRelation
    +   * child carbon table relation
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
    +      aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan = {
    +    logicalPlan.transform {
    +      case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +           &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap =>
    +        val (updatedGroupExp, updatedAggExp, newChild, None) =
    +          getUpdatedExpressions(grExp,
    +            aggExp,
    +            child,
    +            None,
    +            aggDataMapSchema,
    +            childCarbonRelation)
    +        Aggregate(updatedGroupExp,
    +          updatedAggExp,
    +          newChild)
    +      case Aggregate(grExp,
    +      aggExp,
    +      Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _)))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +           &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap =>
    +        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
    +          getUpdatedExpressions(grExp,
    +            aggExp,
    +            child,
    +            Some(expression),
    +            aggDataMapSchema,
    +            childCarbonRelation)
    +        Aggregate(updatedGroupExp,
    +          updatedAggExp,
    +          Filter(updatedFilterExpression.get,
    +            newChild))
    +      case Aggregate(grExp, aggExp, l: LogicalRelation)
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +           &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap =>
    +        val (updatedGroupExp, updatedAggExp, newChild, None) =
    +          getUpdatedExpressions(grExp,
    +            aggExp,
    +            l,
    +            None,
    +            aggDataMapSchema,
    +            childCarbonRelation)
    +        Aggregate(updatedGroupExp,
    +          updatedAggExp,
    +          newChild)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to get the updated expression for pre aggregated table.
    +   * It will replace the attribute of actual plan with child table attributes.
    +   * Updation will be done for below expression.
    +   * 1. Grouping expression
    +   * 2. aggregate expression
    +   * 3. child logical plan
    +   * 4. filter expression if present
    +   *
    +   * @param groupingExpressions
    +   * actual plan grouping expression
    +   * @param aggregateExpressions
    +   * actual plan aggregate expression
    +   * @param child
    +   * child logical plan
    +   * @param filterExpression
    +   * filter expression
    +   * @param aggDataMapSchema
    +   * pre aggregate table schema
    +   * @param childCarbonRelation
    +   * pre aggregate table logical relation
    +   * @return tuple of(updated grouping expression,
    +   *         updated aggregate expression,
    +   *         updated child logical plan,
    +   *         updated filter expression if present in actual plan)
    +   */
    +  def getUpdatedExpressions(groupingExpressions: Seq[Expression],
    +      aggregateExpressions: Seq[NamedExpression],
    +      child: LogicalPlan, filterExpression: Option[Expression] = None,
    +      aggDataMapSchema: DataMapSchema,
    +      childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
    +    Option[Expression]) = {
    +    // transforming the group by expression attributes with child attributes
    +    val updatedGroupExp = groupingExpressions.map { exp =>
    +      exp.transform {
    +        case attr: AttributeReference =>
    +          getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
    +      }
    +    }
    +    // below code is for updating the aggregate expression.
    +    // Note: In case of aggregate expression updation we need to return alias as
    +    //       while showing the final result we need to show based on actual query
    +    //       for example: If query is "select name from table group by name"
    +    //       if we only update the attributes it will show child table column name in final output
    +    //       so for handling this if attributes does not have alias we need to return alias of
    +    // parent
    +    //       table column name
    +    // Rules for updating aggregate expression.
    +    // 1. If it matches with attribute reference return alias of child attribute reference
    +    // 2. If it matches with alias return same alias with child attribute reference
    +    // 3. If it matches with alias of any supported aggregate function return aggregate function
    +    // with child attribute reference. Please check class level documentation how when aggregate
    +    // function will be updated
    +
    +    val updatedAggExp = aggregateExpressions.map {
    +      exp => exp match {
    --- End diff --
    
    no need of this match , directly use case inside map


---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/801/



---

[GitHub] carbondata issue #1464: [WIP][CARBONDATA-1523]Pre Aggregate table selection ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1433/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r148995479
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
    @@ -41,6 +48,24 @@
        */
       private Map<String, String> properties;
     
    +  /**
    +   * map of parent column name to set of child column column without
    +   * aggregation function
    +   */
    +  private Map<String, Set<ColumnSchema>> parentToNonAggChildMapping;
    +
    +  /**
    +   * map of parent column name to set of child columns column with
    +   * aggregation function
    +   */
    +  private Map<String, Set<ColumnSchema>> parentToAggChildMapping;
    +
    +  /**
    +   * map of parent column name to set of aggregation function applied in
    +   * in parent column
    +   */
    +  private Map<String, Set<String>> parentColumnToAggregationsMapping;
    +
       public DataMapSchema(String className) {
    --- End diff --
    
    Create a factory and extend this datamap schema and implement as per the class name. All aggdatamap related should go to AggregationDataMapSchema class. DataMapSchema should be the generic class and it should only contains the generic attributes.


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1608/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    retest this please


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1682/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/942/



---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1066/



---

[GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149003521
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala ---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with count)).
    + * Note: During aggregate table creation for average table will be created with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present is in plan
    +      // then call is from create preaggregate table class so no need to transform the query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for plan to be resolved
    +      // return the same plan as we can tranform the plan only when everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    --- End diff --
    
    Already added this check in case statement , no need to add this check again


---

[GitHub] carbondata issue #1464: [CARBONDATA-1523]Pre Aggregate table selection and Q...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1464
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/945/



---