You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 19:16:41 UTC

[1/2] carbondata git commit: [CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes

Repository: carbondata
Updated Branches:
  refs/heads/pre-aggregate c1eefee7c -> dda2573a1


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
new file mode 100644
index 0000000..3fb0db0
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -0,0 +1,829 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, InsertIntoCarbonTable, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, PredicateSubquery, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * Class for applying Pre Aggregate rules
+ * Responsibility.
+ * 1. Check plan is valid plan for updating the parent table plan with child table
+ * 2. Updated the plan based on child schema
+ *
+ * Rules for Upadating the plan
+ * 1. Grouping expression rules
+ *    1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ *    2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ *    2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ *    2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation
+ * sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ *
+ * 3. Filter Expression rules.
+ *    3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ *
+ * @param sparkSession
+ * spark session
+ */
+case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    var needAnalysis = true
+    plan.transformExpressions {
+      // first check if any preAgg scala function is applied it is present is in plan
+      // then call is from create preaggregate table class so no need to transform the query plan
+      case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") =>
+        needAnalysis = false
+        al
+      // in case of query if any unresolve alias is present then wait for plan to be resolved
+      // return the same plan as we can tranform the plan only when everything is resolved
+      case unresolveAlias@UnresolvedAlias(_, _) =>
+        needAnalysis = false
+        unresolveAlias
+      case attr@UnresolvedAttribute(_) =>
+        needAnalysis = false
+        attr
+    }
+    // if plan is not valid for transformation then return same plan
+    if (!needAnalysis) {
+      plan
+    } else {
+      // create buffer to collect all the column and its metadata information
+      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
+      var isValidPlan = true
+      val carbonTable = plan match {
+        // matching the plan based on supported plan
+        // if plan is matches with any case it will validate and get all
+        // information required for transforming the plan
+
+        // When plan has grouping expression, aggregate expression
+        // subquery
+        case Aggregate(groupingExp,
+        aggregateExp,
+        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+          // only carbon query plan is supported checking whether logical relation is
+          // is for carbon
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          // if it is valid plan then extract the query columns
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          carbonTable
+
+        // below case for handling filter query
+        // When plan has grouping expression, aggregate expression
+        // filter expression
+        case Aggregate(groupingExp, aggregateExp,
+        Filter(filterExp,
+        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
+          // only carbon query plan is supported checking whether logical relation is
+          // is for carbon
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          // if it is valid plan then extract the query columns
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          // TODO need to handle filter predicate subquery scenario
+          isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
+          // getting the columns from filter expression
+          if(isValidPlan) {
+            filterExp.transform {
+              case attr: AttributeReference =>
+                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
+                attr
+            }
+          }
+          carbonTable
+
+        // When plan has grouping expression, aggregate expression
+        // logical relation
+        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
+          // only carbon query plan is supported checking whether logical relation is
+          // is for carbon
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          // if it is valid plan then extract the query columns
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          carbonTable
+        case _ =>
+          isValidPlan = false
+          null
+      }
+      // if plan is valid then update the plan with child attributes
+      if (isValidPlan) {
+        // getting all the projection columns
+        val listProjectionColumn = list
+          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
+        // getting all the filter columns
+        val listFilterColumn = list
+          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
+        // getting all the aggregation columns
+        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
+        // create a query plan object which will be used to select the list of pre aggregate tables
+        // matches with this plan
+        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
+          listAggregationColumn.asJava,
+          listFilterColumn.asJava)
+        // create aggregate table selector object
+        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
+        // select the list of valid child tables
+        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
+        // if it doesnot match with any pre aggregate table return the same plan
+        if (!selectedDataMapSchemas.isEmpty) {
+          // sort the selected child schema based on size to select smallest pre aggregate table
+          val (aggDataMapSchema, carbonRelation) =
+            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
+              val catalog = sparkSession.sessionState.catalog
+              val carbonRelation = catalog
+                .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
+                  .getTableName,
+                  Some(selectedDataMapSchema.getRelationIdentifier
+                    .getDatabaseName))).asInstanceOf[SubqueryAlias].child
+                .asInstanceOf[LogicalRelation]
+              (selectedDataMapSchema, carbonRelation)
+            }.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
+          // transform the query plan based on selected child schema
+          transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation)
+        } else {
+          plan
+        }
+      } else {
+        plan
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to get the child attribute reference
+   * based on parent name
+   *
+   * @param dataMapSchema
+   * child schema
+   * @param attributeReference
+   * parent attribute reference
+   * @param childCarbonRelation
+   * child logical relation
+   * @param aggFunction
+   * aggregation function applied on child
+   * @return child attribute reference
+   */
+  def getChildAttributeReference(dataMapSchema: DataMapSchema,
+      attributeReference: AttributeReference,
+      childCarbonRelation: LogicalRelation,
+      aggFunction: String = ""): AttributeReference = {
+    val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
+    val columnSchema = if (aggFunction.isEmpty) {
+      aggregationDataMapSchema.getChildColByParentColName(attributeReference.name)
+    } else {
+      aggregationDataMapSchema.getAggChildColByParent(attributeReference.name, aggFunction)
+    }
+    // here column schema cannot be null, if it is null then aggregate table selection
+    // logic has some problem
+    if (null == columnSchema) {
+      throw new AnalysisException("Column doesnot exists in Pre Aggregate table")
+    }
+    // finding the child attribute from child logical relation
+    childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
+  }
+
+  /**
+   * Below method will be used to transform the main table plan to child table plan
+   * rules for transformming is as below.
+   * 1. Grouping expression rules
+   *    1.1 Change the parent attribute reference for of group expression
+   * to child attribute reference
+   *
+   * 2. Aggregate expression rules
+   *    2.1 Change the parent attribute reference for of group expression to
+   * child attribute reference
+   *    2.2 Change the count AggregateExpression to Sum as count
+   * is already calculated so in case of aggregate table
+   * we need to apply sum to get the count
+   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
+   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+   * Note: During aggregate table creation for average table will be created with two columns
+   * one for sum(column) and count(column) to support rollup
+   * 3. Filter Expression rules.
+   *    3.1 Updated filter expression attributes with child table attributes
+   * 4. Update the Parent Logical relation with child Logical relation
+   *
+   * @param logicalPlan
+   * parent logical plan
+   * @param aggDataMapSchema
+   * select data map schema
+   * @param childCarbonRelation
+   * child carbon table relation
+   * @return transformed plan
+   */
+  def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
+      aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan = {
+    logicalPlan.transform {
+      case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+        val (updatedGroupExp, updatedAggExp, newChild, None) =
+          getUpdatedExpressions(grExp,
+            aggExp,
+            child,
+            None,
+            aggDataMapSchema,
+            childCarbonRelation)
+        Aggregate(updatedGroupExp,
+          updatedAggExp,
+          newChild)
+      case Aggregate(grExp,
+      aggExp,
+      Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _)))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
+          getUpdatedExpressions(grExp,
+            aggExp,
+            child,
+            Some(expression),
+            aggDataMapSchema,
+            childCarbonRelation)
+        Aggregate(updatedGroupExp,
+          updatedAggExp,
+          Filter(updatedFilterExpression.get,
+            newChild))
+      case Aggregate(grExp, aggExp, l: LogicalRelation)
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+        val (updatedGroupExp, updatedAggExp, newChild, None) =
+          getUpdatedExpressions(grExp,
+            aggExp,
+            l,
+            None,
+            aggDataMapSchema,
+            childCarbonRelation)
+        Aggregate(updatedGroupExp,
+          updatedAggExp,
+          newChild)
+    }
+  }
+
+  /**
+   * Below method will be used to get the updated expression for pre aggregated table.
+   * It will replace the attribute of actual plan with child table attributes.
+   * Updation will be done for below expression.
+   * 1. Grouping expression
+   * 2. aggregate expression
+   * 3. child logical plan
+   * 4. filter expression if present
+   *
+   * @param groupingExpressions
+   * actual plan grouping expression
+   * @param aggregateExpressions
+   * actual plan aggregate expression
+   * @param child
+   * child logical plan
+   * @param filterExpression
+   * filter expression
+   * @param aggDataMapSchema
+   * pre aggregate table schema
+   * @param childCarbonRelation
+   * pre aggregate table logical relation
+   * @return tuple of(updated grouping expression,
+   *         updated aggregate expression,
+   *         updated child logical plan,
+   *         updated filter expression if present in actual plan)
+   */
+  def getUpdatedExpressions(groupingExpressions: Seq[Expression],
+      aggregateExpressions: Seq[NamedExpression],
+      child: LogicalPlan, filterExpression: Option[Expression] = None,
+      aggDataMapSchema: DataMapSchema,
+      childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
+    Option[Expression]) = {
+    // transforming the group by expression attributes with child attributes
+    val updatedGroupExp = groupingExpressions.map { exp =>
+      exp.transform {
+        case attr: AttributeReference =>
+          getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+      }
+    }
+    // below code is for updating the aggregate expression.
+    // Note: In case of aggregate expression updation we need to return alias as
+    //       while showing the final result we need to show based on actual query
+    //       for example: If query is "select name from table group by name"
+    //       if we only update the attributes it will show child table column name in final output
+    //       so for handling this if attributes does not have alias we need to return alias of
+    // parent
+    //       table column name
+    // Rules for updating aggregate expression.
+    // 1. If it matches with attribute reference return alias of child attribute reference
+    // 2. If it matches with alias return same alias with child attribute reference
+    // 3. If it matches with alias of any supported aggregate function return aggregate function
+    // with child attribute reference. Please check class level documentation how when aggregate
+    // function will be updated
+
+    val updatedAggExp = aggregateExpressions.map {
+      // case for attribute reference
+      case attr: AttributeReference =>
+        val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+          attr,
+          childCarbonRelation)
+        // returning the alias to show proper column name in output
+        Alias(childAttributeReference,
+          attr.name)(NamedExpression.newExprId,
+          childAttributeReference.qualifier).asInstanceOf[NamedExpression]
+      // case for alias
+      case Alias(attr: AttributeReference, name) =>
+        val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+          attr,
+          childCarbonRelation)
+        // returning alias with child attribute reference
+        Alias(childAttributeReference,
+          name)(NamedExpression.newExprId,
+          childAttributeReference.qualifier).asInstanceOf[NamedExpression]
+      // for aggregate function case
+      case alias@Alias(attr: AggregateExpression, name) =>
+        // get the updated aggregate aggregate function
+        val aggExp = getUpdatedAggregateExpressionForChild(attr,
+          aggDataMapSchema,
+          childCarbonRelation)
+        // returning alias with child attribute reference
+        Alias(aggExp,
+          name)(NamedExpression.newExprId,
+          alias.qualifier).asInstanceOf[NamedExpression]
+    }
+    // transformaing the logical relation
+    val newChild = child.transform {
+      case _: LogicalRelation =>
+        childCarbonRelation
+      case _: SubqueryAlias =>
+        childCarbonRelation
+    }
+    // updating the filter expression if present
+    val updatedFilterExpression = if (filterExpression.isDefined) {
+      val filterExp = filterExpression.get
+      Some(filterExp.transform {
+        case attr: AttributeReference =>
+          getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+      })
+    } else {
+      None
+    }
+    (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression)
+  }
+
+  /**
+   * Below method will be used to get the aggregate expression based on match
+   * Aggregate expression updation rules
+   * 1 Change the count AggregateExpression to Sum as count
+   * is already calculated so in case of aggregate table
+   * we need to apply sum to get the count
+   * 2 In case of average aggregate function select 2 columns from aggregate table
+   * with aggregation sum and count.
+   * Then add divide(sum(column with sum), sum(column with count)).
+   * Note: During aggregate table creation for average aggregation function
+   * table will be created with two columns one for sum(column) and count(column)
+   * to support rollup
+   *
+   * @param aggExp
+   * aggregate expression
+   * @param dataMapSchema
+   * child data map schema
+   * @param childCarbonRelation
+   * child logical relation
+   * @return updated expression
+   */
+  def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
+      dataMapSchema: DataMapSchema,
+      childCarbonRelation: LogicalRelation):
+  Expression = {
+    aggExp.aggregateFunction match {
+      // Change the count AggregateExpression to Sum as count
+      // is already calculated so in case of aggregate table
+      // we need to apply sum to get the count
+      case count@Count(Seq(attr: AttributeReference)) =>
+        AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          count.prettyName),
+          LongType)),
+          aggExp.mode,
+          isDistinct = false)
+      case sum@Sum(attr: AttributeReference) =>
+        AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          sum.prettyName)),
+          aggExp.mode,
+          isDistinct = false)
+      case max@Max(attr: AttributeReference) =>
+        AggregateExpression(Max(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          max.prettyName)),
+          aggExp.mode,
+          isDistinct = false)
+      case min@Min(attr: AttributeReference) =>
+        AggregateExpression(Min(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          min.prettyName)),
+          aggExp.mode,
+          isDistinct = false)
+      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          sum.prettyName),
+          changeDataType)),
+          aggExp.mode,
+          isDistinct = false)
+      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        AggregateExpression(Min(Cast(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          min.prettyName),
+          changeDataType)),
+          aggExp.mode,
+          isDistinct = false)
+      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        AggregateExpression(Max(Cast(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          max.prettyName),
+          changeDataType)),
+          aggExp.mode,
+          isDistinct = false)
+
+      // In case of average aggregate function select 2 columns from aggregate table
+      // with aggregation sum and count.
+      // Then add divide(sum(column with sum), sum(column with count)).
+      case Average(attr: AttributeReference) =>
+        Divide(AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          "sum")),
+          aggExp.mode,
+          isDistinct = false),
+          AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+            attr,
+            childCarbonRelation,
+            "count"),
+            LongType)),
+            aggExp.mode,
+            isDistinct = false))
+      // In case of average aggregate function select 2 columns from aggregate table
+      // with aggregation sum and count.
+      // Then add divide(sum(column with sum), sum(column with count)).
+      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        Divide(AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+          attr,
+          childCarbonRelation,
+          "sum"),
+          changeDataType)),
+          aggExp.mode,
+          isDistinct = false),
+          AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+            attr,
+            childCarbonRelation,
+            "count"),
+            LongType)),
+            aggExp.mode,
+            isDistinct = false))
+    }
+  }
+
+  /**
+   * Method to get the carbon table and table name
+   *
+   * @param parentLogicalRelation
+   * parent table relation
+   * @return tuple of carbon table and table name
+   */
+  def getCarbonTableAndTableName(parentLogicalRelation: LogicalRelation): (CarbonTable, String) = {
+    val carbonTable = parentLogicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+      .carbonRelation
+      .metaData.carbonTable
+    val tableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getTableName
+    (carbonTable, tableName)
+  }
+
+  /**
+   * Below method will be used to get the query columns from plan
+   *
+   * @param groupByExpression
+   * group by expression
+   * @param aggregateExpressions
+   * aggregate expression
+   * @param carbonTable
+   * parent carbon table
+   * @param tableName
+   * parent table name
+   * @param list
+   * list of attributes
+   * @return plan is valid
+   */
+  def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression],
+      aggregateExpressions: Seq[NamedExpression],
+      carbonTable: CarbonTable, tableName: String,
+      list: scala.collection.mutable.ListBuffer[QueryColumn]): Boolean = {
+    aggregateExpressions.map {
+      case attr: AttributeReference =>
+        list += getQueryColumn(attr.name,
+          carbonTable,
+          tableName);
+      case Alias(attr: AttributeReference, _) =>
+        list += getQueryColumn(attr.name,
+          carbonTable,
+          tableName);
+      case Alias(attr: AggregateExpression, _) =>
+        if (attr.isDistinct) {
+          return false
+        }
+        val queryColumn = validateAggregateFunctionAndGetFields(carbonTable,
+          attr.aggregateFunction,
+          tableName)
+        if (queryColumn.nonEmpty) {
+          list ++= queryColumn
+        } else {
+          return false
+        }
+    }
+    true
+  }
+
+  /**
+   * Below method will be used to validate aggregate function and get the attribute information
+   * which is applied on select query.
+   * Currently sum, max, min, count, avg is supported
+   * in case of any other aggregate function it will return empty sequence
+   * In case of avg it will return two fields one for count
+   * and other of sum of that column to support rollup
+   *
+   * @param carbonTable
+   * parent table
+   * @param aggFunctions
+   * aggregation function
+   * @param tableName
+   * parent table name
+   * @return list of fields
+   */
+  def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
+      aggFunctions: AggregateFunction,
+      tableName: String
+  ): Seq[QueryColumn] = {
+    val changedDataType = true
+    aggFunctions match {
+      case sum@Sum(attr: AttributeReference) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          sum.prettyName))
+      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          sum.prettyName,
+          changeDataType.typeName,
+          changedDataType))
+      case count@Count(Seq(attr: AttributeReference)) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          count.prettyName))
+      case min@Min(attr: AttributeReference) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          min.prettyName))
+      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          min.prettyName,
+          changeDataType.typeName,
+          changedDataType))
+      case max@Max(attr: AttributeReference) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          max.prettyName))
+      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          max.prettyName,
+          changeDataType.typeName,
+          changedDataType))
+      // in case of average need to return two columns
+      // sum and count of the column to added during table creation to support rollup
+      case Average(attr: AttributeReference) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          "sum"
+        ), getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          "count"
+        ))
+      // in case of average need to return two columns
+      // sum and count of the column to added during table creation to support rollup
+      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        Seq(getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          "sum",
+          changeDataType.typeName,
+          changedDataType), getQueryColumn(attr.name,
+          carbonTable,
+          tableName,
+          "count",
+          changeDataType.typeName,
+          changedDataType))
+      case _ =>
+        Seq.empty
+    }
+  }
+
+  /**
+   * Below method will be used to get the query column object which
+   * will have details of the column and its property
+   *
+   * @param columnName
+   * parent column name
+   * @param carbonTable
+   * parent carbon table
+   * @param tableName
+   * parent table name
+   * @param aggFunction
+   * aggregate function applied
+   * @param dataType
+   * data type of the column
+   * @param isChangedDataType
+   * is cast is applied on column
+   * @param isFilterColumn
+   * is filter is applied on column
+   * @return query column
+   */
+  def getQueryColumn(columnName: String,
+      carbonTable: CarbonTable,
+      tableName: String,
+      aggFunction: String = "",
+      dataType: String = "",
+      isChangedDataType: Boolean = false,
+      isFilterColumn: Boolean = false): QueryColumn = {
+    val columnSchema = carbonTable.getColumnByName(tableName, columnName).getColumnSchema
+    if (isChangedDataType) {
+      new QueryColumn(columnSchema, columnSchema.getDataType.getName, aggFunction, isFilterColumn)
+    } else {
+      new QueryColumn(columnSchema,
+        CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
+        aggFunction, isFilterColumn)
+    }
+  }
+}
+
+/**
+ * Insert into carbon table from other source
+ */
+object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transform {
+      // Wait until children are resolved.
+      case p: LogicalPlan if !p.childrenResolved => p
+
+      case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
+    }
+  }
+
+  def castChildOutput(p: InsertIntoTable,
+      relation: CarbonDatasourceHadoopRelation,
+      child: LogicalPlan)
+  : LogicalPlan = {
+    if (relation.carbonRelation.output.size > CarbonCommonConstants
+      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+      sys
+        .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+          .DEFAULT_MAX_NUMBER_OF_COLUMNS
+        )
+    }
+    val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+      .getParentRelationIdentifiers.isEmpty
+    // transform logical plan if the load is for aggregate table.
+    val childPlan = if (isAggregateTable) {
+      transformAggregatePlan(child)
+    } else {
+      child
+    }
+    if (childPlan.output.size >= relation.carbonRelation.output.size) {
+      val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
+        columnWithIndex._1 match {
+          case attr: Alias =>
+            Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
+          case attr: Attribute =>
+            Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
+          case attr => attr
+        }
+      }
+      val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
+        p.child
+      } else {
+        Project(newChildOutput, childPlan)
+      }
+      InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
+    } else {
+      sys.error("Cannot insert into target table because column number are different")
+    }
+  }
+
+  /**
+   * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
+   *
+   * @param logicalPlan
+   * @return
+   */
+  private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
+    logicalPlan transform {
+      case aggregate@Aggregate(_, aExp, _) =>
+        val newExpressions = aExp.flatMap {
+          case alias@Alias(attrExpression: AggregateExpression, _) =>
+            attrExpression.aggregateFunction match {
+              case Average(attr: AttributeReference) =>
+                Seq(Alias(attrExpression
+                  .copy(aggregateFunction = Sum(attr),
+                    resultId = NamedExpression.newExprId), attr.name + "_sum")(),
+                  Alias(attrExpression
+                    .copy(aggregateFunction = Count(attr),
+                      resultId = NamedExpression.newExprId), attr.name + "_count")())
+              case Average(cast@Cast(attr: AttributeReference, _)) =>
+                Seq(Alias(attrExpression
+                  .copy(aggregateFunction = Sum(cast),
+                    resultId = NamedExpression.newExprId),
+                  attr.name + "_sum")(),
+                  Alias(attrExpression
+                    .copy(aggregateFunction = Count(cast),
+                      resultId = NamedExpression.newExprId), attr.name + "_count")())
+              case _ => Seq(alias)
+            }
+          case namedExpr: NamedExpression => Seq(namedExpr)
+        }
+        aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]])
+      case plan: LogicalPlan => plan
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index f698dd4..205b716 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -169,15 +169,15 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
         CarbonPreInsertionCasts ::
+        CarbonPreAggregateQueryRules(sparkSession) ::
         CarbonIUDAnalysisRule(sparkSession) ::
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
         (if (conf.runSQLonFile) {
           new ResolveDataSource(sparkSession) :: Nil
-        } else {
-          Nil
-        })
+        } else {  Nil }
+           )
 
       override val extendedCheckRules = Seq(
         PreWriteCheck(conf, catalog))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 46a2515..3bed9d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -476,6 +476,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
   }
 
+  def addPreAggFunction(sql: String): String = {
+    addPreAgg(new lexical.Scanner(sql.toLowerCase)) match {
+      case Success(query, _) => query
+      case failureOrError => throw new MalformedCarbonCommandException(
+        s"Unsupported query")
+    }
+  }
+
   def getBucketFields(
       properties: mutable.Map[String, String],
       fields: Seq[Field],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 5c51156..401b149 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{CarbonSession, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
@@ -74,6 +74,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
   val parser = new CarbonSpark2SqlParser
 
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    Option(ctx.query()).map(plan)
     val fileStorage = Option(ctx.createFileFormat) match {
       case Some(value) =>
         if (value.children.get(1).getText.equalsIgnoreCase("by")) {


[2/2] carbondata git commit: [CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes

Posted by ra...@apache.org.
[CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes

This closes #1464


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dda2573a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dda2573a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dda2573a

Branch: refs/heads/pre-aggregate
Commit: dda2573a1160ba0d97047693c4b094bc6da06d95
Parents: c1eefee
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Oct 30 12:44:32 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:46:24 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   2 +
 .../ThriftWrapperSchemaConverterImpl.java       |   7 +-
 .../schema/table/AggregationDataMapSchema.java  | 212 +++++
 .../core/metadata/schema/table/CarbonTable.java |  16 +-
 .../metadata/schema/table/DataMapSchema.java    |  19 +-
 .../schema/table/DataMapSchemaFactory.java      |  38 +
 .../core/metadata/schema/table/TableInfo.java   |   7 +-
 .../core/metadata/schema/table/TableSchema.java |   5 +-
 .../core/preagg/AggregateTableSelector.java     | 135 +++
 .../carbondata/core/preagg/QueryColumn.java     |  70 ++
 .../carbondata/core/preagg/QueryPlan.java       |  59 ++
 .../TestPreAggregateTableSelection.scala        | 175 ++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   7 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   3 +
 .../CreatePreAggregateTableCommand.scala        |  12 +-
 .../preaaggregate/PreAggregateListeners.scala   |  24 +-
 .../preaaggregate/PreAggregateUtil.scala        | 184 ++--
 .../spark/sql/hive/CarbonAnalysisRules.scala    | 101 +--
 .../sql/hive/CarbonPreAggregateRules.scala      | 829 +++++++++++++++++++
 .../spark/sql/hive/CarbonSessionState.scala     |   6 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   8 +
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   3 +-
 23 files changed, 1709 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index cb0b5c3..f17a9e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1411,6 +1411,8 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
 
+  public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index b914e06..fef2e0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -628,10 +629,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
 
   @Override public DataMapSchema fromExternalToWrapperDataMapSchema(
       org.apache.carbondata.format.DataMapSchema thriftDataMapSchema) {
-    DataMapSchema childSchema =
-        new DataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
+    DataMapSchema childSchema = DataMapSchemaFactory.INSTANCE
+        .getDataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
     childSchema.setProperties(thriftDataMapSchema.getProperties());
-    if (thriftDataMapSchema.getRelationIdentifire() != null) {
+    if (null != thriftDataMapSchema.getRelationIdentifire()) {
       RelationIdentifier relationIdentifier =
           new RelationIdentifier(thriftDataMapSchema.getRelationIdentifire().getDatabaseName(),
               thriftDataMapSchema.getRelationIdentifire().getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
new file mode 100644
index 0000000..87c07f4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+
+/**
+ * data map schema class for pre aggregation
+ */
+public class AggregationDataMapSchema extends DataMapSchema {
+
+  /**
+   * map of parent column name to set of child column column without
+   * aggregation function
+   */
+  private Map<String, Set<ColumnSchema>> parentToNonAggChildMapping;
+
+  /**
+   * map of parent column name to set of child columns column with
+   * aggregation function
+   */
+  private Map<String, Set<ColumnSchema>> parentToAggChildMapping;
+
+  /**
+   * map of parent column name to set of aggregation function applied in
+   * in parent column
+   */
+  private Map<String, Set<String>> parentColumnToAggregationsMapping;
+
+  public AggregationDataMapSchema(String dataMapName, String className) {
+    super(dataMapName, className);
+  }
+
+  public void setChildSchema(TableSchema childSchema) {
+    super.setChildSchema(childSchema);
+    List<ColumnSchema> listOfColumns = getChildSchema().getListOfColumns();
+    fillNonAggFunctionColumns(listOfColumns);
+    fillAggFunctionColumns(listOfColumns);
+    fillParentNameToAggregationMapping(listOfColumns);
+  }
+
+  /**
+   * Below method will be used to get the columns on which aggregate function is not applied
+   * @param columnName
+   *                parent column name
+   * @return child column schema
+   */
+  public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
+    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+    if (null != columnSchemas) {
+      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+      while (iterator.hasNext()) {
+        ColumnSchema next = iterator.next();
+        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
+          return next;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to get the column schema based on parent column name
+   * @param columName
+   *                parent column name
+   * @return child column schema
+   */
+  public ColumnSchema getChildColByParentColName(String columName) {
+    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+    for (ColumnSchema columnSchema : listOfColumns) {
+      List<ParentColumnTableRelation> parentColumnTableRelations =
+          columnSchema.getParentColumnTableRelations();
+      if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) {
+        return columnSchema;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to get the child column schema based on parent name and aggregate
+   * function applied on column
+   * @param columnName
+   *                  parent column name
+   * @param aggFunction
+   *                  aggregate function applied
+   * @return child column schema
+   */
+  public ColumnSchema getAggChildColByParent(String columnName,
+      String aggFunction) {
+    Set<ColumnSchema> columnSchemas = parentToAggChildMapping.get(columnName);
+    if (null != columnSchemas) {
+      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+      while (iterator.hasNext()) {
+        ColumnSchema next = iterator.next();
+        if (null != next.getAggFunction() && next.getAggFunction().equalsIgnoreCase(aggFunction)) {
+          return next;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method is to check if parent column with matching aggregate function
+   * @param parentColumnName
+   *                    parent column name
+   * @param aggFunction
+   *                    aggregate function
+   * @return is matching
+   */
+  public boolean isColumnWithAggFunctionExists(String parentColumnName, String aggFunction) {
+    Set<String> aggFunctions = parentColumnToAggregationsMapping.get(parentColumnName);
+    if (null != aggFunctions && aggFunctions.contains(aggFunction)) {
+      return true;
+    }
+    return false;
+  }
+
+
+  /**
+   * Method to prepare mapping of parent to list of aggregation function applied on that column
+   * @param listOfColumns
+   *        child column schema list
+   */
+  private void fillParentNameToAggregationMapping(List<ColumnSchema> listOfColumns) {
+    parentColumnToAggregationsMapping = new HashMap<>();
+    for (ColumnSchema column : listOfColumns) {
+      if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+        List<ParentColumnTableRelation> parentColumnTableRelations =
+            column.getParentColumnTableRelations();
+        if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+          String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+          Set<String> aggFunctions = parentColumnToAggregationsMapping.get(columnName);
+          if (null == aggFunctions) {
+            aggFunctions = new HashSet<>();
+            parentColumnToAggregationsMapping.put(columnName, aggFunctions);
+          }
+          aggFunctions.add(column.getAggFunction());
+        }
+      }
+    }
+  }
+
+  /**
+   * Below method will be used prepare mapping between parent column to non aggregation function
+   * columns
+   * @param listOfColumns
+   *                    list of child columns
+   */
+  private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+    parentToNonAggChildMapping = new HashMap<>();
+    for (ColumnSchema column : listOfColumns) {
+      if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) {
+        fillMappingDetails(column, parentToNonAggChildMapping);
+      }
+    }
+  }
+
+  private void fillMappingDetails(ColumnSchema column,
+      Map<String, Set<ColumnSchema>> map) {
+    List<ParentColumnTableRelation> parentColumnTableRelations =
+        column.getParentColumnTableRelations();
+    if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+      String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+      Set<ColumnSchema> columnSchemas = map.get(columnName);
+      if (null == columnSchemas) {
+        columnSchemas = new HashSet<>();
+        map.put(columnName, columnSchemas);
+      }
+      columnSchemas.add(column);
+    }
+  }
+
+  /**
+   * Below method will be used to fill parent to list of aggregation column mapping
+   * @param listOfColumns
+   *        list of child columns
+   */
+  private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+    parentToAggChildMapping = new HashMap<>();
+    for (ColumnSchema column : listOfColumns) {
+      if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+        fillMappingDetails(column, parentToAggChildMapping);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index ca0952d..0fd9fbf 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -126,6 +126,8 @@ public class CarbonTable implements Serializable {
 
   private int dimensionOrdinalMax;
 
+  private boolean hasDataMapSchema;
+
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -158,6 +160,8 @@ public class CarbonTable implements Serializable {
       table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
           tableInfo.getFactTable().getPartitionInfo());
     }
+    table.hasDataMapSchema =
+        null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
     return table;
   }
 
@@ -702,13 +706,13 @@ public class CarbonTable implements Serializable {
     this.dimensionOrdinalMax = dimensionOrdinalMax;
   }
 
-  public boolean isPreAggregateTable() {
-    return tableInfo.getParentRelationIdentifiers() != null && !tableInfo
-        .getParentRelationIdentifiers().isEmpty();
+
+  public boolean hasDataMapSchema() {
+    return hasDataMapSchema;
   }
 
-  public boolean hasPreAggregateTables() {
-    return tableInfo.getDataMapSchemaList() != null && !tableInfo
-        .getDataMapSchemaList().isEmpty();
+  public boolean isChildDataMap() {
+    return null != tableInfo.getParentRelationIdentifiers()
+        && !tableInfo.getParentRelationIdentifiers().isEmpty();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index e0632d9..5a9017b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -30,20 +30,20 @@ public class DataMapSchema implements Serializable, Writable {
 
   private static final long serialVersionUID = 6577149126264181553L;
 
-  private String dataMapName;
+  protected String dataMapName;
 
   private String className;
 
-  private RelationIdentifier relationIdentifier;
+  protected RelationIdentifier relationIdentifier;
   /**
    * child table schema
    */
-  private TableSchema childSchema;
+  protected TableSchema childSchema;
 
   /**
    * relation properties
    */
-  private Map<String, String> properties;
+  protected Map<String, String> properties;
 
   public DataMapSchema() {
   }
@@ -69,6 +69,10 @@ public class DataMapSchema implements Serializable, Writable {
     return properties;
   }
 
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
   public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
     this.relationIdentifier = relationIdentifier;
   }
@@ -81,10 +85,6 @@ public class DataMapSchema implements Serializable, Writable {
     this.properties = properties;
   }
 
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
   @Override public void write(DataOutput out) throws IOException {
     out.writeUTF(dataMapName);
     out.writeUTF(className);
@@ -114,7 +114,7 @@ public class DataMapSchema implements Serializable, Writable {
     this.className = in.readUTF();
     boolean isRelationIdnentifierExists = in.readBoolean();
     if (isRelationIdnentifierExists) {
-      this.relationIdentifier = new RelationIdentifier();
+      this.relationIdentifier = new RelationIdentifier(null, null, null);
       this.relationIdentifier.readFields(in);
     }
     boolean isChildSchemaExists = in.readBoolean();
@@ -130,6 +130,5 @@ public class DataMapSchema implements Serializable, Writable {
       String value = in.readUTF();
       this.properties.put(key, value);
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
new file mode 100644
index 0000000..5729959
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA;
+
+public class DataMapSchemaFactory {
+  public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory();
+
+  /**
+   * Below class will be used to get data map schema object
+   * based on class name
+   * @param className
+   * @return data map schema
+   */
+  public DataMapSchema getDataMapSchema(String dataMapName, String className) {
+    switch (className) {
+      case AGGREGATIONDATAMAPSCHEMA:
+        return new AggregationDataMapSchema(dataMapName, className);
+      default:
+        return new DataMapSchema(dataMapName, className);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 1d9e2ec..65878bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -295,7 +295,12 @@ public class TableInfo implements Serializable, Writable {
       for (int i = 0; i < numberOfChildTable; i++) {
         DataMapSchema childSchema = new DataMapSchema();
         childSchema.readFields(in);
-        dataMapSchemaList.add(childSchema);
+        DataMapSchema dataMapSchema = DataMapSchemaFactory.INSTANCE
+            .getDataMapSchema(childSchema.getDataMapName(), childSchema.getClassName());
+        dataMapSchema.setChildSchema(childSchema.getChildSchema());
+        dataMapSchema.setRelationIdentifier(childSchema.getRelationIdentifier());
+        dataMapSchema.setProperties(childSchema.getProperties());
+        dataMapSchemaList.add(dataMapSchema);
       }
     }
     boolean isParentTableRelationIndentifierExists = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 714e0d8..03848d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -263,9 +263,10 @@ public class TableSchema implements Serializable, Writable {
     Map<String, String> properties = new HashMap<>();
     properties.put("CHILD_SELECT QUERY", queryString);
     properties.put("QUERYTYPE", queryType);
-    DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
-    dataMapSchema.setChildSchema(this);
+    DataMapSchema dataMapSchema =
+        new DataMapSchema(dataMapName, className);
     dataMapSchema.setProperties(properties);
+    dataMapSchema.setChildSchema(this);
     dataMapSchema.setRelationIdentifier(relationIdentifier);
     return dataMapSchema;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
new file mode 100644
index 0000000..8b87a1a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Below class will be used to select the aggregate table based
+ * query plan. Rules for selecting the aggregate table is below:
+ * 1. Select all aggregate table based on projection
+ * 2. select aggregate table based on filter exp,
+ * 2. select if aggregate tables based on aggregate columns
+ */
+public class AggregateTableSelector {
+
+  /**
+   * current query plan
+   */
+  private QueryPlan queryPlan;
+
+  /**
+   * parent table
+   */
+  private CarbonTable parentTable;
+
+  public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) {
+    this.queryPlan = queryPlan;
+    this.parentTable = parentTable;
+  }
+
+  /**
+   * Below method will be used to select pre aggregate tables based on query plan
+   * Rules for selecting the aggregate table is below:
+   * 1. Select all aggregate table based on projection
+   * 2. select aggregate table based on filter exp,
+   * 2. select if aggregate tables based on aggregate columns
+   *
+   * @return selected pre aggregate table schema
+   */
+  public List<DataMapSchema> selectPreAggDataMapSchema() {
+    List<QueryColumn> projectionColumn = queryPlan.getProjectionColumn();
+    List<QueryColumn> aggColumns = queryPlan.getAggregationColumns();
+    List<QueryColumn> filterColumns = queryPlan.getFilterColumns();
+    List<DataMapSchema> dataMapSchemaList = parentTable.getTableInfo().getDataMapSchemaList();
+    List<DataMapSchema> selectedDataMapSchema = new ArrayList<>();
+    boolean isMatch;
+    // match projection columns
+    if (null != projectionColumn && !projectionColumn.isEmpty()) {
+      for (DataMapSchema dmSchema : dataMapSchemaList) {
+        AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+        isMatch = true;
+        for (QueryColumn queryColumn : projectionColumn) {
+          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+          if (null == columnSchemaByParentName) {
+            isMatch = false;
+          }
+        }
+        if (isMatch) {
+          selectedDataMapSchema.add(dmSchema);
+        }
+      }
+      // if projection column is present but selected table list size is zero then
+      if (selectedDataMapSchema.size() == 0) {
+        return selectedDataMapSchema;
+      }
+    }
+
+    // match filter columns
+    if (null != filterColumns && !filterColumns.isEmpty()) {
+      List<DataMapSchema> dmSchemaToIterate =
+          selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+      selectedDataMapSchema = new ArrayList<>();
+      for (DataMapSchema dmSchema : dmSchemaToIterate) {
+        isMatch = true;
+        for (QueryColumn queryColumn : filterColumns) {
+          AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+          if (null == columnSchemaByParentName) {
+            isMatch = false;
+          }
+        }
+        if (isMatch) {
+          selectedDataMapSchema.add(dmSchema);
+        }
+      }
+      // if filter column is present and selection size is zero then return
+      if (selectedDataMapSchema.size() == 0) {
+        return selectedDataMapSchema;
+      }
+    }
+    // match aggregation columns
+    if (null != aggColumns && !aggColumns.isEmpty()) {
+      List<DataMapSchema> dmSchemaToIterate =
+          selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+      selectedDataMapSchema = new ArrayList<>();
+      for (DataMapSchema dmSchema : dmSchemaToIterate) {
+        isMatch = true;
+        for (QueryColumn queryColumn : aggColumns) {
+          AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+          if (!aggregationDataMapSchema
+              .isColumnWithAggFunctionExists(queryColumn.getColumnSchema().getColumnName(),
+                  queryColumn.getAggFunction())) {
+            isMatch = false;
+          }
+        }
+        if (isMatch) {
+          selectedDataMapSchema.add(dmSchema);
+        }
+      }
+    }
+    return selectedDataMapSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
new file mode 100644
index 0000000..a62d556
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * column present in query
+ */
+public class QueryColumn {
+
+  /**
+   * parent column schema
+   */
+  private ColumnSchema columnSchema;
+
+  /**
+   * to store the change data type in case of cast
+   */
+  private String changedDataType;
+
+  /**
+   * aggregation function applied
+   */
+  private String aggFunction;
+
+  /**
+   * is filter column
+   */
+  private boolean isFilterColumn;
+
+  public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
+      boolean isFilterColumn) {
+    this.columnSchema = columnSchema;
+    this.changedDataType = changedDataType;
+    this.aggFunction = aggFunction;
+    this.isFilterColumn = isFilterColumn;
+  }
+
+  public ColumnSchema getColumnSchema() {
+    return columnSchema;
+  }
+
+  public String getChangedDataType() {
+    return changedDataType;
+  }
+
+  public String getAggFunction() {
+    return aggFunction;
+  }
+
+  public boolean isFilterColumn() {
+    return isFilterColumn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
new file mode 100644
index 0000000..21a34fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.List;
+
+/**
+ * class to maintain the query plan to select the data map tables
+ */
+public class QueryPlan {
+
+  /**
+   * List of projection columns
+   */
+  private List<QueryColumn> projectionColumn;
+
+  /**
+   * list of aggregation columns
+   */
+  private List<QueryColumn> aggregationColumns;
+
+  /**
+   * list of filter columns
+   */
+  private List<QueryColumn> filterColumns;
+
+  public QueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> aggregationColumns,
+      List<QueryColumn> filterColumns) {
+    this.projectionColumn = projectionColumn;
+    this.aggregationColumns = aggregationColumns;
+    this.filterColumns = filterColumns;
+  }
+
+  public List<QueryColumn> getProjectionColumn() {
+    return projectionColumn;
+  }
+
+  public List<QueryColumn> getAggregationColumns() {
+    return aggregationColumns;
+  }
+
+  public List<QueryColumn> getFilterColumns() {
+    return filterColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
new file mode 100644
index 0000000..6b435c6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaTable1regate
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("drop table if exists agg0")
+    sql("drop table if exists agg1")
+    sql("drop table if exists agg2")
+    sql("drop table if exists agg3")
+    sql("drop table if exists agg4")
+    sql("drop table if exists agg5")
+    sql("drop table if exists agg6")
+    sql("drop table if exists agg7")
+    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
+    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(id) from mainTable group by name")
+    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,count(id) from mainTable group by name")
+    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(age),count(id) from mainTable group by name")
+    sql("create datamap agg5 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name")
+    sql("create datamap agg6 on table mainTable using 'preaggregate' as select name,min(age) from mainTable group by name")
+    sql("create datamap agg7 on table mainTable using 'preaggregate' as select name,max(age) from mainTable group by name")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+  }
+
+
+  test("test PreAggregate table selection 1") {
+    val df = sql("select name from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 2") {
+    val df = sql("select name from mainTable where name in (select name from mainTable) group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+  }
+
+  test("test PreAggregate table selection 3") {
+    val df = sql("select name from mainTable where name in (select name from mainTable group by name) group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+  }
+
+  test("test PreAggregate table selection 4") {
+    val df = sql("select name from mainTable where name in('vishal') group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 5") {
+    val df = sql("select name, sum(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 6") {
+    val df = sql("select sum(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 7") {
+    val df = sql("select sum(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2")
+  }
+
+  test("test PreAggregate table selection 8") {
+    val df = sql("select count(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+  }
+
+  test("test PreAggregate table selection 9") {
+    val df = sql("select sum(age), count(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+  }
+
+  test("test PreAggregate table selection 10") {
+    val df = sql("select avg(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+  }
+
+  test("test PreAggregate table selection 11") {
+    val df = sql("select max(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+  }
+
+  test("test PreAggregate table selection 12") {
+    val df = sql("select min(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+  }
+
+  test("test PreAggregate table selection 13") {
+    val df = sql("select name, sum(age) from mainTable where city = 'Bangalore' group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+  }
+
+  test("test PreAggregate table selection 14") {
+    val df = sql("select sum(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 15") {
+    val df = sql("select avg(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+  }
+
+  test("test PreAggregate table selection 16") {
+    val df = sql("select max(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+  }
+
+  test("test PreAggregate table selection 17") {
+    val df = sql("select min(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+  }
+
+  test("test PreAggregate table selection 18") {
+    val df = sql("select count(id) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+  }
+
+  def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+    var isValidPlan = false
+    plan.transform {
+      // first check if any preaTable1 scala function is applied it is present is in plan
+      // then call is from create preaTable1regate table class so no need to transform the query plan
+      case logicalRelation:LogicalRelation =>
+        if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+          if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+            isValidPlan = true
+          }
+        }
+        logicalRelation
+    }
+    if(!isValidPlan) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("drop table if exists agg0")
+    sql("drop table if exists agg1")
+    sql("drop table if exists agg2")
+    sql("drop table if exists agg3")
+    sql("drop table if exists agg4")
+    sql("drop table if exists agg5")
+    sql("drop table if exists agg6")
+    sql("drop table if exists agg7")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 42447da..e83d96a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -173,6 +173,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val DATAMAP = carbonKeyWord("DATAMAP")
   protected val ON = carbonKeyWord("ON")
   protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES")
+  protected val SELECT = carbonKeyWord("SELECT")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r
@@ -989,6 +990,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
     }
 
+  lazy val addPreAgg: Parser[String] =
+    SELECT ~> restInput <~ opt(";") ^^ {
+      case query =>
+        "select preAGG() as preAgg, " + query
+    }
+
   protected lazy val primitiveFieldType: Parser[Field] =
     primitiveTypes ^^ {
       case e1 =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9899be1..28dcbf2 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -463,7 +463,7 @@ object CarbonDataRDDFactory {
         throw new Exception(status(0)._2._2.errorMsg)
       }
       // if segment is empty then fail the data load
-      if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPreAggregateTable &&
+      if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
           !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
         // update the load entry in table status file for changing the status to failure
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index a37b55b..b69ef2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -52,6 +52,9 @@ class CarbonEnv {
 
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
+    // added for handling preaggregate table creation. when user will fire create ddl for
+    // create table we are adding a udf so no need to apply PreAggregate rules.
+    sparkSession.udf.register("preAgg", () => "")
     if (!initialized) {
       // update carbon session parameters , preserve thread parameters
       val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index ebf6273..3a78968 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
 /**
  * Below command class will be used to create pre-aggregate table
  * and updating the parent table about the child table information
@@ -47,9 +49,10 @@ case class CreatePreAggregateTableCommand(
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val df = sparkSession.sql(queryString)
-    val fieldRelationMap = PreAggregateUtil
-      .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+    val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+    val df = sparkSession.sql(updatedQuery)
+    val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+      df.logicalPlan, queryString)
     val fields = fieldRelationMap.keySet.toSeq
     val tableProperties = mutable.Map[String, String]()
     dmproperties.foreach(t => tableProperties.put(t._1, t._2))
@@ -87,7 +90,8 @@ case class CreatePreAggregateTableCommand(
       val tableInfo = relation.tableMeta.carbonTable.getTableInfo
       // child schema object which will be updated on parent table about the
       val childSchema = tableInfo.getFactTable.buildChildSchema(
-        dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+        dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+        tableInfo.getDatabaseName, queryString, "AGGREGATION")
       dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
       // updating the parent table about child table
       PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 8271e57..7a66e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -105,7 +105,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
           }
       }
 
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(s"Cannot change data type for columns in " +
                                                 s"pre-aggreagate table ${
                                                   carbonTable.getDatabaseName
@@ -126,12 +126,12 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener
     val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
     val carbonTable = deleteSegmentByDatePreEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on tables which have a pre-aggregate table. " +
           "Drop pre-aggregation table to continue")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on pre-aggregate table")
       }
@@ -150,11 +150,11 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on tables which have a pre-aggregate table")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on pre-aggregate table")
       }
@@ -190,7 +190,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
               s"pre-aggregate table ${ dataMapSchema.getRelationIdentifier.toString}")
           }
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
           carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
       }
@@ -209,11 +209,11 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
       operationContext: OperationContext): Unit = {
     val renameTablePostListener = event.asInstanceOf[AlterTableRenamePreEvent]
     val carbonTable = renameTablePostListener.carbonTable
-    if (carbonTable.isPreAggregateTable) {
+    if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
         "Rename operation for pre-aggregate table is not supported.")
     }
-    if (carbonTable.hasPreAggregateTables) {
+    if (carbonTable.hasDataMapSchema) {
       throw new UnsupportedOperationException(
         "Rename operation is not supported for table with pre-aggregate tables")
     }
@@ -231,12 +231,12 @@ object UpdatePreAggregatePreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Update operation is not supported for tables which have a pre-aggregate table. Drop " +
           "pre-aggregate tables to continue.")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Update operation is not supported for pre-aggregate table")
       }
@@ -255,12 +255,12 @@ object DeletePreAggregatePreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
           "pre-aggregate tables to continue.")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Delete operation is not supported for pre-aggregate table")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index b926705..62e7623 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,13 +16,14 @@
  */
 package org.apache.spark.sql.execution.command.preaaggregate
 
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
@@ -51,9 +52,14 @@ object PreAggregateUtil {
 
   def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
     plan match {
-      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+      case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+          carbonRelation.metaData.carbonTable
+      case Aggregate(_, _, logicalRelation: LogicalRelation)
+        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+          carbonRelation.metaData.carbonTable
       case _ => throw new MalformedCarbonCommandException("table does not exist")
     }
   }
@@ -67,54 +73,86 @@ object PreAggregateUtil {
    * @param selectStmt
    * @return list of fields
    */
-  def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+  def validateActualSelectPlanAndGetAttributes(plan: LogicalPlan,
       selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
-    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
     plan match {
-      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
-          .metaData.carbonTable
-        val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-          .getTableName
-        val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-          .getDatabaseName
-        val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-          .getTableId
-        if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+      case Aggregate(groupByExp, aggExp, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) =>
+        getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+      case Aggregate(groupByExp, aggExp, logicalRelation: LogicalRelation) =>
+        getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+    }
+  }
+
+  /**
+   * Below method will be used to get the fields from expressions
+   * @param groupByExp
+   *                  grouping expression
+   * @param aggExp
+   *               aggregate expression
+   * @param logicalRelation
+   *                        logical relation
+   * @param selectStmt
+   *                   select statement
+   * @return fields from expressions
+   */
+  def getFieldsFromPlan(groupByExp: Seq[Expression],
+      aggExp: Seq[NamedExpression], logicalRelation: LogicalRelation, selectStmt: String):
+  scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    if (!logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+      throw new MalformedCarbonCommandException("Un-supported table")
+    }
+    val carbonTable = logicalRelation.relation.
+      asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+      .metaData.carbonTable
+    val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getTableName
+    val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getDatabaseName
+    val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getTableId
+    if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+      throw new MalformedCarbonCommandException(
+        "Pre Aggregation is not supported on Pre-Aggregated Table")
+    }
+    groupByExp.map {
+      case attr: AttributeReference =>
+        fieldToDataMapFieldMap += getField(attr.name,
+          attr.dataType,
+          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName = parentTableName,
+          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      case _ =>
+        throw new MalformedCarbonCommandException(s"Unsupported Function in select Statement:${
+          selectStmt } ")
+    }
+    aggExp.map {
+      case Alias(attr: AggregateExpression, _) =>
+        if (attr.isDistinct) {
           throw new MalformedCarbonCommandException(
-            "Pre Aggregation is not supported on Pre-Aggregated Table")
-        }
-        aExp.map {
-          case Alias(attr: AggregateExpression, _) =>
-            if (attr.isDistinct) {
-              throw new MalformedCarbonCommandException(
-                "Distinct is not supported On Pre Aggregation")
-            }
-            fieldToDataMapFieldMap ++= (validateAggregateFunctionAndGetFields(carbonTable,
-              attr.aggregateFunction,
-              parentTableName,
-              parentDatabaseName,
-              parentTableId))
-          case attr: AttributeReference =>
-            fieldToDataMapFieldMap += getField(attr.name,
-              attr.dataType,
-              parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-              parentTableName = parentTableName,
-              parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-          case Alias(attr: AttributeReference, _) =>
-            fieldToDataMapFieldMap += getField(attr.name,
-              attr.dataType,
-              parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-              parentTableName = parentTableName,
-              parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-          case _ =>
-            throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
-              selectStmt } ")
+            "Distinct is not supported On Pre Aggregation")
         }
-        Some(carbonTable)
+        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
+          attr.aggregateFunction,
+          parentTableName,
+          parentDatabaseName,
+          parentTableId)
+      case attr: AttributeReference =>
+        fieldToDataMapFieldMap += getField(attr.name,
+          attr.dataType,
+          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName = parentTableName,
+          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      case Alias(attr: AttributeReference, _) =>
+        fieldToDataMapFieldMap += getField(attr.name,
+          attr.dataType,
+          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName = parentTableName,
+          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") =>
       case _ =>
-        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
+        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
+          selectStmt } ")
     }
     fieldToDataMapFieldMap
   }
@@ -347,30 +385,6 @@ object PreAggregateUtil {
   }
 
   /**
-   * This method will split schema string into multiple parts of configured size and
-   * registers the parts as keys in tableProperties which will be read by spark to prepare
-   * Carbon Table fields
-   *
-   * @param sparkConf
-   * @param schemaJsonString
-   * @return
-   */
-  private def prepareSchemaJson(sparkConf: SparkConf,
-      schemaJsonString: String): String = {
-    val threshold = sparkConf
-      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
-        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
-    // Split the JSON string.
-    val parts = schemaJsonString.grouped(threshold).toSeq
-    var schemaParts: Seq[String] = Seq.empty
-    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
-    parts.zipWithIndex.foreach { case (part, index) =>
-      schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
-    }
-    schemaParts.mkString(",")
-  }
-
-  /**
    * Validates that the table exists and acquires meta lock on it.
    *
    * @param dbName
@@ -453,4 +467,30 @@ object PreAggregateUtil {
   def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
     SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).nonEmpty
   }
+
+  /**
+   * Below method will be used to update logical plan
+   * this is required for creating pre aggregate tables,
+   * so @CarbonPreAggregateRules will not be applied during creation
+   * @param logicalPlan
+   *                    actual logical plan
+   * @return updated plan
+   */
+  def updatePreAggQueyPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+    val updatedPlan = logicalPlan.transform {
+      case _@Project(projectList, child) =>
+        val buffer = new ArrayBuffer[NamedExpression]()
+        buffer ++= projectList
+        buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+          Seq.empty, isDistinct = false), "preAgg")())
+        Project(buffer, child)
+      case Aggregate(groupByExp, aggExp, l: UnresolvedRelation) =>
+        val buffer = new ArrayBuffer[NamedExpression]()
+        buffer ++= aggExp
+        buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+          Seq.empty, isDistinct = false), "preAgg")())
+        Aggregate(groupByExp, buffer, l)
+    }
+    updatedPlan
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index ba7e1eb..7bd0fad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,111 +20,12 @@ package org.apache.spark.sql.hive
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, ExprId, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, _}
+import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
-        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
-    }
-  }
-
-  def castChildOutput(p: InsertIntoTable,
-      relation: CarbonDatasourceHadoopRelation,
-      child: LogicalPlan)
-  : LogicalPlan = {
-    if (relation.carbonRelation.output.size > CarbonCommonConstants
-      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
-      sys
-        .error("Maximum supported column by carbon is:" + CarbonCommonConstants
-          .DEFAULT_MAX_NUMBER_OF_COLUMNS
-        )
-    }
-    val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
-      .getParentRelationIdentifiers.isEmpty
-    // transform logical plan if the load is for aggregate table.
-    val childPlan = if (isAggregateTable) {
-      transformAggregatePlan(child)
-    } else {
-      child
-    }
-    if (childPlan.output.size >= relation.carbonRelation.output.size) {
-      val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
-        columnWithIndex._1 match {
-          case attr: Alias =>
-            Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
-          case attr: Attribute =>
-            Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
-          case attr => attr
-        }
-      }
-      val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
-        p.child
-      } else {
-        Project(newChildOutput, childPlan)
-      }
-      InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
-    } else {
-      sys.error("Cannot insert into target table because column number are different")
-    }
-  }
-
-  /**
-   * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
-   *
-   * @param logicalPlan
-   * @return
-   */
-  private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
-    logicalPlan transform {
-      case aggregate@Aggregate(_, aExp, _) =>
-        val newExpressions = aExp flatMap {
-          case alias@Alias(attrExpression: AggregateExpression, _) =>
-            attrExpression.aggregateFunction flatMap {
-              case Average(attr: AttributeReference) =>
-                Seq(Alias(attrExpression
-                  .copy(aggregateFunction = Sum(attr.withName(attr.name)),
-                    resultId = NamedExpression.newExprId),
-                  attr.name)(),
-                  Alias(attrExpression
-                    .copy(aggregateFunction = Count(attr.withName(attr.name)),
-                      resultId = NamedExpression.newExprId), attr.name)())
-              case Average(Cast(attr: AttributeReference, _)) =>
-                Seq(Alias(attrExpression
-                  .copy(aggregateFunction = Sum(attr.withName(attr.name)),
-                    resultId = NamedExpression.newExprId),
-                  attr.name)(),
-                  Alias(attrExpression
-                    .copy(aggregateFunction = Count(attr.withName(attr.name)),
-                      resultId = NamedExpression.newExprId), attr.name)())
-              case _: DeclarativeAggregate => Seq(alias)
-              case _ => Nil
-            }
-          case namedExpr: NamedExpression => Seq(namedExpr)
-        }
-        aggregate.copy(aggregateExpressions = newExpressions)
-      case plan: LogicalPlan => plan
-    }
-  }
-}
 
 case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {