You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/29 14:39:57 UTC

carbondata git commit: [CARBONDATA-1925][Pre-Aggregate]Added code to support case expression

Repository: carbondata
Updated Branches:
  refs/heads/master c5e72a4c9 -> adb8c1356


[CARBONDATA-1925][Pre-Aggregate]Added code to support case expression

Added code to support expression inside aggregation function for pre-aggregate table

This closes #1694


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/adb8c135
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/adb8c135
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/adb8c135

Branch: refs/heads/master
Commit: adb8c1356d0b753ecefe132cf5193ea3f2f92dea
Parents: c5e72a4
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Dec 20 15:46:02 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Dec 29 22:39:44 2017 +0800

----------------------------------------------------------------------
 .../schema/table/AggregationDataMapSchema.java  |   4 +
 .../TestPreAggregateExpressions.scala           | 102 ++++++
 .../command/carbonTableSchemaCommon.scala       |  50 ++-
 .../apache/spark/sql/CarbonExpressions.scala    |  13 +
 .../preaaggregate/PreAggregateUtil.scala        | 313 ++++++++++++-------
 .../command/timeseries/TimeseriesUtil.scala     |   8 +-
 .../sql/hive/CarbonPreAggregateRules.scala      |   8 +-
 7 files changed, 364 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 8f6a2d3..4b2d492 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -329,6 +329,10 @@ public class AggregationDataMapSchema extends DataMapSchema {
             return false;
           }
         }
+      } else {
+        // in case of any expression one column can be derived from multiple column
+        // in that case we cannot do rollup so hit the maintable
+        return false;
       }
     }
     return true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
new file mode 100644
index 0000000..4171690
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("DROP TABLE IF EXISTS mainTable")
+    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+  }
+  test("test pre agg create table with expression 1") {
+    sql(
+      s"""
+         | CREATE DATAMAP agg0 ON TABLE mainTable USING 'preaggregate' AS
+         | SELECT name,
+         | count(age)
+         | FROM mainTable GROUP BY name
+         | """.stripMargin)
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
+  }
+
+  test("test pre agg create table with expression 2") {
+    sql(
+      s"""
+         | CREATE DATAMAP agg1 ON TABLE mainTable USING 'preaggregate' AS
+         | SELECT name,
+         | sum(CASE WHEN age=35 THEN id ELSE 0 END)
+         | FROM mainTable GROUP BY name
+         | """.stripMargin)
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
+  }
+
+  test("test pre agg create table with expression 3") {
+    sql(
+      s"""
+         | CREATE DATAMAP agg2 ON TABLE mainTable USING 'preaggregate' AS
+         | SELECT name,
+         | sum(CASE WHEN age=35 THEN id ELSE 0 END),
+         | city
+         | FROM mainTable GROUP BY name,city
+         | """.stripMargin)
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
+  }
+
+  test("test pre agg create table with expression 4") {
+    sql(
+      s"""
+         | CREATE DATAMAP agg3 ON TABLE mainTable USING 'preaggregate' AS
+         | SELECT name,
+         | sum(CASE WHEN age=27 THEN id ELSE 0 END)
+         | FROM mainTable GROUP BY name
+         | """.stripMargin)
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
+  }
+
+  test("test pre agg create table with expression 5") {
+    sql(
+      s"""
+         | CREATE DATAMAP agg4 ON TABLE mainTable USING 'preaggregate' AS
+         | SELECT name,
+         | sum(CASE WHEN age=27 THEN id ELSE 0 END),
+         | SUM(CASE WHEN age=35 THEN id ELSE 0 END)
+         | FROM mainTable GROUP BY name
+         | """.stripMargin)
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
+  }
+
+  test("test pre agg create table with expression 6") {
+    sql(
+      s"""
+         | CREATE DATAMAP agg5 ON TABLE mainTable USING 'preaggregate' AS
+         | SELECT name,
+         | COUNT(CASE WHEN age=27 THEN(CASE WHEN name='eason' THEN id ELSE 0 END) ELSE 0 END)
+         | FROM mainTable GROUP BY name
+         | """.stripMargin)
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count")
+  }
+
+  override def afterAll: Unit = {
+    sql("DROP TABLE IF EXISTS mainTable")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index c7a7b69..1e368cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -79,7 +79,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
 }
 
 case class DataMapField(var aggregateFunction: String = "",
-    columnTableRelation: Option[ColumnTableRelation] = None) {
+    columnTableRelationList: Option[Seq[ColumnTableRelation]] = None) {
 }
 
 case class ColumnTableRelation(parentColumnName: String, parentColumnId: String,
@@ -435,14 +435,21 @@ class TableNewProcessor(cm: TableModel) {
     if(isParentColumnRelation) {
       val dataMapField = map.get.get(field).get
       columnSchema.setFunction(dataMapField.aggregateFunction)
-        val relation = dataMapField.columnTableRelation.get
-        val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
-        val relationIdentifier = new RelationIdentifier(
-          relation.parentDatabaseName, relation.parentTableName, relation.parentTableId)
-        val parentColumnTableRelation = new ParentColumnTableRelation(
-          relationIdentifier, relation.parentColumnId, relation.parentColumnName)
-        parentColumnTableRelationList.add(parentColumnTableRelation)
-        columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
+      val columnRelationList = dataMapField.columnTableRelationList.get
+      val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
+      columnRelationList.foreach {
+        columnRelation =>
+          val relationIdentifier = new RelationIdentifier(
+            columnRelation.parentDatabaseName,
+            columnRelation.parentTableName,
+            columnRelation.parentTableId)
+          val parentColumnTableRelation = new ParentColumnTableRelation(
+            relationIdentifier,
+            columnRelation.parentColumnId,
+            columnRelation.parentColumnName)
+          parentColumnTableRelationList.add(parentColumnTableRelation)
+      }
+      columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
     }
     // TODO: Need to fill RowGroupID, converted type
     // & Number of Children after DDL finalization
@@ -467,10 +474,11 @@ class TableNewProcessor(cm: TableModel) {
     // Sort columns should be at the begin of all columns
     cm.sortKeyDims.get.foreach { keyDim =>
       val field = cm.dimCols.find(keyDim equals _.column).get
-      val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+      val encoders = if (getEncoderFromParent(field)) {
         cm.parentTable.get.getColumnByName(
           cm.parentTable.get.getTableName,
-          cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
+          cm.dataMapRelation.get.get(field).get.columnTableRelationList.
+            get(0).parentColumnName).getEncoder
       } else {
         val encoders = new java.util.ArrayList[Encoding]()
         encoders.add(Encoding.DICTIONARY)
@@ -491,12 +499,11 @@ class TableNewProcessor(cm: TableModel) {
     cm.dimCols.foreach { field =>
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
-        val encoders = if (cm.parentTable.isDefined &&
-                           cm.dataMapRelation.get.get(field).isDefined) {
+        val encoders = if (getEncoderFromParent(field)) {
           cm.parentTable.get.getColumnByName(
             cm.parentTable.get.getTableName,
             cm.dataMapRelation.get.get(field).get.
-              columnTableRelation.get.parentColumnName).getEncoder
+              columnTableRelationList.get(0).parentColumnName).getEncoder
         } else {
           val encoders = new java.util.ArrayList[Encoding]()
           encoders.add(Encoding.DICTIONARY)
@@ -524,14 +531,14 @@ class TableNewProcessor(cm: TableModel) {
       var isAggFunPresent = false
       // getting the encoder from maintable so whatever encoding is applied in maintable
       // same encoder can be applied on aggregate table
-      val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+      val encoders = if (getEncoderFromParent(field)) {
         isAggFunPresent =
           cm.dataMapRelation.get.get(field).get.aggregateFunction.equalsIgnoreCase("sum") ||
           cm.dataMapRelation.get.get(field).get.aggregateFunction.equals("avg")
         if(!isAggFunPresent) {
           cm.parentTable.get.getColumnByName(
             cm.parentTable.get.getTableName,
-            cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName)
+            cm.dataMapRelation.get.get(field).get.columnTableRelationList.get(0).parentColumnName)
             .getEncoder
         } else {
           new java.util.ArrayList[Encoding]()
@@ -668,6 +675,17 @@ class TableNewProcessor(cm: TableModel) {
     tableInfo
   }
 
+  /**
+   * Method to check to get the encoder from parent or not
+   * @param field column field
+   * @return get encoder from parent
+   */
+  private def getEncoderFromParent(field: Field) : Boolean = {
+     cm.parentTable.isDefined &&
+        cm.dataMapRelation.get.get(field).isDefined &&
+        cm.dataMapRelation.get.get(field).get.columnTableRelationList.size==1
+  }
+
   //  For checking if the specified col group columns are specified in fields list.
   protected def checkColGroupsValidity(colGrps: Seq[String],
       allCols: Seq[ColumnSchema],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
index c1f9e8a..d473bc4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
@@ -47,6 +47,19 @@ object CarbonExpressions {
   }
 
   /**
+   * unapply method of Cast class with expression.
+   */
+  object MatchCastExpression {
+    def unapply(expr: Expression): Option[(Expression, DataType)] = {
+      expr match {
+        case a: Cast if a.child.isInstanceOf[Expression] =>
+          Some((a.child.asInstanceOf[Expression], a.dataType))
+        case _ => None
+      }
+    }
+  }
+
+  /**
    * unapply method of Describe Table format.
    */
   object CarbonDescribeTable {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1f5bd41..217436d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -19,20 +19,19 @@ package org.apache.spark.sql.execution.command.preaaggregate
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, DataFrame, SparkSession}
-import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession}
+import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCast => Cast, MatchCastExpression}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, NamedExpression, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -116,29 +115,45 @@ object PreAggregateUtil {
       throw new MalformedCarbonCommandException(
         "Pre Aggregation is not supported on Pre-Aggregated Table")
     }
+    var counter = 0
     aggExp.map {
-      case Alias(attr: AggregateExpression, _) =>
+      case Alias(attr: AggregateExpression, name) =>
         if (attr.isDistinct) {
           throw new MalformedCarbonCommandException(
             "Distinct is not supported On Pre Aggregation")
         }
-        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
+        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(
+          carbonTable,
           attr.aggregateFunction,
           parentTableName,
           parentDatabaseName,
-          parentTableId)
+          parentTableId,
+          "column_" + counter)
+        counter = counter + 1
       case attr: AttributeReference =>
-        fieldToDataMapFieldMap += getField(attr.name,
+        val columnRelation = getColumnRelation(
+          attr.name,
+          parentTableId,
+          parentTableName,
+          parentDatabaseName,
+          carbonTable)
+        fieldToDataMapFieldMap += createField(
+          attr.name,
           attr.dataType,
-          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
           parentTableName = parentTableName,
-          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+          columnTableRelationList = Seq(columnRelation))
       case Alias(attr: AttributeReference, _) =>
-        fieldToDataMapFieldMap += getField(attr.name,
+        val columnRelation = getColumnRelation(
+          attr.name,
+          parentTableId,
+          parentTableName,
+          parentDatabaseName,
+          carbonTable)
+        fieldToDataMapFieldMap += createField(
+          attr.name,
           attr.dataType,
-          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
           parentTableName = parentTableName,
-          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+          columnTableRelationList = Seq(columnRelation))
       case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") =>
       case _ =>
         throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
@@ -148,6 +163,34 @@ object PreAggregateUtil {
   }
 
   /**
+   * Below method will be used to get the column relation
+   * with the parent column which will be used during query and data loading
+   * @param parentColumnName
+   * parent column name
+   * @param parentTableId
+   * parent column id
+   * @param parentTableName
+   * parent table name
+   * @param parentDatabaseName
+   * parent database name
+   * @param carbonTable
+   * carbon table
+   * @return column relation object
+   */
+  def getColumnRelation(parentColumnName: String,
+      parentTableId: String,
+      parentTableName: String,
+      parentDatabaseName: String,
+      carbonTable: CarbonTable) : ColumnTableRelation = {
+    val parentColumnId = carbonTable.getColumnByName(parentTableName, parentColumnName).getColumnId
+    val columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName,
+      parentColumnId = parentColumnId,
+      parentTableName = parentTableName,
+      parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+    columnTableRelation
+  }
+
+  /**
    * Below method will be used to validate about the aggregate function
    * which is applied on select query.
    * Currently sum, max, min, count, avg is supported
@@ -155,102 +198,105 @@ object PreAggregateUtil {
    * In case of avg it will return two fields one for count
    * and other of sum of that column to support rollup
    *
-   * @param carbonTable
-   * @param aggFunctions
-   * @param parentTableName
-   * @param parentDatabaseName
-   * @param parentTableId
+   * @param carbonTable parent carbon table
+   * @param aggFunctions aggregation function
+   * @param parentTableName parent table name
+   * @param parentDatabaseName parent database name
+   * @param parentTableId parent column id
+   * @param newColumnName
+   * In case of any expression this will be used as a column name for pre aggregate
    * @return list of fields
    */
   def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
       aggFunctions: AggregateFunction,
       parentTableName: String,
       parentDatabaseName: String,
-      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
+      parentTableId: String,
+      newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
     val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
     aggFunctions match {
-      case sum@Sum(attr: AttributeReference) =>
-        list += getField(attr.name,
-          attr.dataType,
-          sum.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
-        list += getField(attr.name,
+      case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        list += createFieldForAggregateExpression(
+          exp,
           changeDataType,
-          sum.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case count@Count(Seq(attr: AttributeReference)) =>
-        list += getField(attr.name,
-          attr.dataType,
-          count.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case count@Count(Seq(Cast(attr: AttributeReference, _))) =>
-        list += getField(attr.name,
-          attr.dataType,
-          count.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case min@Min(attr: AttributeReference) =>
-        list += getField(attr.name,
-          attr.dataType,
-          min.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
-        list += getField(attr.name,
+          carbonTable,
+          newColumnName,
+          sum.prettyName)
+      case sum@Sum(exp: Expression) =>
+        list += createFieldForAggregateExpression(
+          exp,
+          sum.dataType,
+          carbonTable,
+          newColumnName,
+          sum.prettyName)
+      case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) =>
+        list += createFieldForAggregateExpression(
+          exp,
           changeDataType,
-          min.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case max@Max(attr: AttributeReference) =>
-        list += getField(attr.name,
-          attr.dataType,
-          max.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
-        list += getField(attr.name,
+          carbonTable,
+          newColumnName,
+          count.prettyName)
+      case count@Count(Seq(exp: Expression)) =>
+        list += createFieldForAggregateExpression(
+          exp,
+          count.dataType,
+          carbonTable,
+          newColumnName,
+          count.prettyName)
+      case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        list += createFieldForAggregateExpression(
+          exp,
           changeDataType,
-          max.prettyName,
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case Average(attr: AttributeReference) =>
-        list += getField(attr.name,
-          attr.dataType,
-          "sum",
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-        list += getField(attr.name,
-          attr.dataType,
-          "count",
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
-        list += getField(attr.name,
+          carbonTable,
+          newColumnName,
+          min.prettyName)
+      case min@Min(exp: Expression) =>
+        list += createFieldForAggregateExpression(
+          exp,
+          min.dataType,
+          carbonTable,
+          newColumnName,
+          min.prettyName)
+      case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        list += createFieldForAggregateExpression(
+          exp,
           changeDataType,
-          "sum",
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
-        list += getField(attr.name,
+          carbonTable,
+          newColumnName,
+          max.prettyName)
+      case max@Max(exp: Expression) =>
+        list += createFieldForAggregateExpression(
+          exp,
+          max.dataType,
+          carbonTable,
+          newColumnName,
+          max.prettyName)
+      case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        list += createFieldForAggregateExpression(
+          exp,
           changeDataType,
-          "count",
-          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName,
-          parentDatabaseName, parentTableId = parentTableId)
+          carbonTable,
+          newColumnName,
+          "sum")
+        list += createFieldForAggregateExpression(
+          exp,
+          changeDataType,
+          carbonTable,
+          newColumnName,
+          "count")
+      case avg@Average(exp: Expression) =>
+        list += createFieldForAggregateExpression(
+          exp,
+          avg.dataType,
+          carbonTable,
+          newColumnName,
+          "sum")
+        list += createFieldForAggregateExpression(
+          exp,
+          avg.dataType,
+          carbonTable,
+          newColumnName,
+          "count")
       case others@_ =>
         throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${
           others.prettyName}")
@@ -258,35 +304,79 @@ object PreAggregateUtil {
   }
 
   /**
+   * Below method will be used to get the field and its data map field object
+   * for aggregate expression
+   * @param expression
+   *                   expression in aggregate function
+   * @param dataType
+   *                 data type
+   * @param carbonTable
+   *                    parent carbon table
+   * @param newColumnName
+   *                      column name of aggregate table
+   * @param aggregationName
+   *                        aggregate function name
+   * @return field and its metadata tuple
+   */
+  def createFieldForAggregateExpression(
+      expression: Expression,
+      dataType: DataType,
+      carbonTable: CarbonTable,
+      newColumnName: String,
+      aggregationName: String): (Field, DataMapField) = {
+    val parentColumnsName = new ArrayBuffer[String]()
+    expression.transform {
+      case attr: AttributeReference =>
+        parentColumnsName += attr.name
+        attr
+    }
+    val arrayBuffer = parentColumnsName.map { name =>
+       getColumnRelation(name,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+        carbonTable)
+    }
+    // if parent column relation is of size more than one that means aggregate table
+    // column is derived from multiple column of main table
+    // or if expression is not a instance of attribute reference
+    // then use column name which is passed
+    val columnName =
+    if (parentColumnsName.size > 1 && !expression.isInstanceOf[AttributeReference]) {
+      newColumnName
+    } else {
+      expression.asInstanceOf[AttributeReference].name
+    }
+    createField(columnName,
+      dataType,
+      aggregationName,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+      arrayBuffer)
+  }
+
+  /**
    * Below method will be used to get the fields object for pre aggregate table
    *
    * @param columnName
    * @param dataType
    * @param aggregateType
-   * @param parentColumnId
    * @param parentTableName
-   * @param parentDatabaseName
-   * @param parentTableId
+   * @param columnTableRelationList
+   *                                List of column relation with parent
    * @return fields object
    */
-  def getField(columnName: String,
+  def createField(columnName: String,
       dataType: DataType,
       aggregateType: String = "",
-      parentColumnId: String,
       parentTableName: String,
-      parentDatabaseName: String,
-      parentTableId: String): (Field, DataMapField) = {
+      columnTableRelationList: Seq[ColumnTableRelation]): (Field, DataMapField) = {
     val actualColumnName = if (aggregateType.equals("")) {
       parentTableName + '_' + columnName
     } else {
       parentTableName + '_' + columnName + '_' + aggregateType
     }
     val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
-    val columnTableRelation = ColumnTableRelation(parentColumnName = columnName,
-      parentColumnId = parentColumnId,
-      parentTableName = parentTableName,
-      parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-    val dataMapField = DataMapField(aggregateType, Some(columnTableRelation))
+    val dataMapField = DataMapField(aggregateType, Some(columnTableRelationList))
     if (dataType.typeName.startsWith("decimal")) {
       val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString)
       (Field(column = actualColumnName,
@@ -508,7 +598,10 @@ object PreAggregateUtil {
     val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase(
       dataMapIdentifier.table)) match {
       case Some(dataMapSchema) =>
-        dataMapSchema.getChildSchema.getListOfColumns.asScala.sortBy(_.getSchemaOrdinal).map(
+        val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala
+          .filter{column =>
+            !column.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)}
+        columns.sortBy(_.getSchemaOrdinal).map(
           _.getColumnName).mkString(",")
       case None =>
         throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
index 6a4ef56..d4358b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
@@ -111,8 +111,8 @@ object TimeSeriesUtil {
   .LinkedHashMap[Field, DataMapField],
       timeSeriesColumn: String) : Any = {
     val isTimeSeriesColumnExits = fieldMapping
-      .exists(obj => obj._2.columnTableRelation.isDefined &&
-                     obj._2.columnTableRelation.get.parentColumnName
+      .exists(obj => obj._2.columnTableRelationList.isDefined &&
+                     obj._2.columnTableRelationList.get(0).parentColumnName
                        .equalsIgnoreCase(timeSeriesColumn) &&
                      obj._2.aggregateFunction.isEmpty)
     if(!isTimeSeriesColumnExits) {
@@ -134,8 +134,8 @@ object TimeSeriesUtil {
       timeSeriesColumn: String,
       timeSeriesFunction: String) : Any = {
     val isTimeSeriesColumnExits = fieldMapping
-      .find(obj => obj._2.columnTableRelation.isDefined &&
-                     obj._2.columnTableRelation.get.parentColumnName
+      .find(obj => obj._2.columnTableRelationList.isDefined &&
+                     obj._2.columnTableRelationList.get(0).parentColumnName
                        .equalsIgnoreCase(timeSeriesColumn) &&
                      obj._2.aggregateFunction.isEmpty)
     isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction

http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 76c39a4..79561c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast}
+import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
@@ -1290,11 +1290,11 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
           attrExpression.aggregateFunction match {
             case Sum(attr: AttributeReference) =>
               (attr.name + "_sum", alias) :: Nil
-            case Sum(MatchCast(attr: AttributeReference, _)) =>
+            case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
               (attr.name + "_sum", alias) :: Nil
             case Count(Seq(attr: AttributeReference)) =>
               (attr.name + "_count", alias) :: Nil
-            case Count(Seq(MatchCast(attr: AttributeReference, _))) =>
+            case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) =>
               (attr.name + "_count", alias) :: Nil
             case Average(attr: AttributeReference) =>
               Seq((attr.name + "_sum", Alias(attrExpression.
@@ -1303,7 +1303,7 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
                 (attr.name, Alias(attrExpression.
                   copy(aggregateFunction = Count(attr),
                     resultId = NamedExpression.newExprId), attr.name + "_count")()))
-            case Average(cast@MatchCast(attr: AttributeReference, _)) =>
+            case Average(cast@MatchCastExpression(attr: AttributeReference, _)) =>
               Seq((attr.name + "_sum", Alias(attrExpression.
                 copy(aggregateFunction = Sum(cast),
                   resultId = NamedExpression.newExprId),