You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 19:16:42 UTC

[2/2] carbondata git commit: [CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes

[CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes

This closes #1464


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dda2573a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dda2573a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dda2573a

Branch: refs/heads/pre-aggregate
Commit: dda2573a1160ba0d97047693c4b094bc6da06d95
Parents: c1eefee
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Oct 30 12:44:32 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:46:24 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   2 +
 .../ThriftWrapperSchemaConverterImpl.java       |   7 +-
 .../schema/table/AggregationDataMapSchema.java  | 212 +++++
 .../core/metadata/schema/table/CarbonTable.java |  16 +-
 .../metadata/schema/table/DataMapSchema.java    |  19 +-
 .../schema/table/DataMapSchemaFactory.java      |  38 +
 .../core/metadata/schema/table/TableInfo.java   |   7 +-
 .../core/metadata/schema/table/TableSchema.java |   5 +-
 .../core/preagg/AggregateTableSelector.java     | 135 +++
 .../carbondata/core/preagg/QueryColumn.java     |  70 ++
 .../carbondata/core/preagg/QueryPlan.java       |  59 ++
 .../TestPreAggregateTableSelection.scala        | 175 ++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   7 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   3 +
 .../CreatePreAggregateTableCommand.scala        |  12 +-
 .../preaaggregate/PreAggregateListeners.scala   |  24 +-
 .../preaaggregate/PreAggregateUtil.scala        | 184 ++--
 .../spark/sql/hive/CarbonAnalysisRules.scala    | 101 +--
 .../sql/hive/CarbonPreAggregateRules.scala      | 829 +++++++++++++++++++
 .../spark/sql/hive/CarbonSessionState.scala     |   6 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   8 +
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   3 +-
 23 files changed, 1709 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index cb0b5c3..f17a9e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1411,6 +1411,8 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
 
+  public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index b914e06..fef2e0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -628,10 +629,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
 
   @Override public DataMapSchema fromExternalToWrapperDataMapSchema(
       org.apache.carbondata.format.DataMapSchema thriftDataMapSchema) {
-    DataMapSchema childSchema =
-        new DataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
+    DataMapSchema childSchema = DataMapSchemaFactory.INSTANCE
+        .getDataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
     childSchema.setProperties(thriftDataMapSchema.getProperties());
-    if (thriftDataMapSchema.getRelationIdentifire() != null) {
+    if (null != thriftDataMapSchema.getRelationIdentifire()) {
       RelationIdentifier relationIdentifier =
           new RelationIdentifier(thriftDataMapSchema.getRelationIdentifire().getDatabaseName(),
               thriftDataMapSchema.getRelationIdentifire().getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
new file mode 100644
index 0000000..87c07f4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+
+/**
+ * data map schema class for pre aggregation
+ */
+public class AggregationDataMapSchema extends DataMapSchema {
+
+  /**
+   * map of parent column name to set of child column column without
+   * aggregation function
+   */
+  private Map<String, Set<ColumnSchema>> parentToNonAggChildMapping;
+
+  /**
+   * map of parent column name to set of child columns column with
+   * aggregation function
+   */
+  private Map<String, Set<ColumnSchema>> parentToAggChildMapping;
+
+  /**
+   * map of parent column name to set of aggregation function applied in
+   * in parent column
+   */
+  private Map<String, Set<String>> parentColumnToAggregationsMapping;
+
+  public AggregationDataMapSchema(String dataMapName, String className) {
+    super(dataMapName, className);
+  }
+
+  public void setChildSchema(TableSchema childSchema) {
+    super.setChildSchema(childSchema);
+    List<ColumnSchema> listOfColumns = getChildSchema().getListOfColumns();
+    fillNonAggFunctionColumns(listOfColumns);
+    fillAggFunctionColumns(listOfColumns);
+    fillParentNameToAggregationMapping(listOfColumns);
+  }
+
+  /**
+   * Below method will be used to get the columns on which aggregate function is not applied
+   * @param columnName
+   *                parent column name
+   * @return child column schema
+   */
+  public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
+    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+    if (null != columnSchemas) {
+      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+      while (iterator.hasNext()) {
+        ColumnSchema next = iterator.next();
+        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
+          return next;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to get the column schema based on parent column name
+   * @param columName
+   *                parent column name
+   * @return child column schema
+   */
+  public ColumnSchema getChildColByParentColName(String columName) {
+    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+    for (ColumnSchema columnSchema : listOfColumns) {
+      List<ParentColumnTableRelation> parentColumnTableRelations =
+          columnSchema.getParentColumnTableRelations();
+      if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) {
+        return columnSchema;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to get the child column schema based on parent name and aggregate
+   * function applied on column
+   * @param columnName
+   *                  parent column name
+   * @param aggFunction
+   *                  aggregate function applied
+   * @return child column schema
+   */
+  public ColumnSchema getAggChildColByParent(String columnName,
+      String aggFunction) {
+    Set<ColumnSchema> columnSchemas = parentToAggChildMapping.get(columnName);
+    if (null != columnSchemas) {
+      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+      while (iterator.hasNext()) {
+        ColumnSchema next = iterator.next();
+        if (null != next.getAggFunction() && next.getAggFunction().equalsIgnoreCase(aggFunction)) {
+          return next;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method is to check if parent column with matching aggregate function
+   * @param parentColumnName
+   *                    parent column name
+   * @param aggFunction
+   *                    aggregate function
+   * @return is matching
+   */
+  public boolean isColumnWithAggFunctionExists(String parentColumnName, String aggFunction) {
+    Set<String> aggFunctions = parentColumnToAggregationsMapping.get(parentColumnName);
+    if (null != aggFunctions && aggFunctions.contains(aggFunction)) {
+      return true;
+    }
+    return false;
+  }
+
+
+  /**
+   * Method to prepare mapping of parent to list of aggregation function applied on that column
+   * @param listOfColumns
+   *        child column schema list
+   */
+  private void fillParentNameToAggregationMapping(List<ColumnSchema> listOfColumns) {
+    parentColumnToAggregationsMapping = new HashMap<>();
+    for (ColumnSchema column : listOfColumns) {
+      if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+        List<ParentColumnTableRelation> parentColumnTableRelations =
+            column.getParentColumnTableRelations();
+        if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+          String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+          Set<String> aggFunctions = parentColumnToAggregationsMapping.get(columnName);
+          if (null == aggFunctions) {
+            aggFunctions = new HashSet<>();
+            parentColumnToAggregationsMapping.put(columnName, aggFunctions);
+          }
+          aggFunctions.add(column.getAggFunction());
+        }
+      }
+    }
+  }
+
+  /**
+   * Below method will be used prepare mapping between parent column to non aggregation function
+   * columns
+   * @param listOfColumns
+   *                    list of child columns
+   */
+  private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+    parentToNonAggChildMapping = new HashMap<>();
+    for (ColumnSchema column : listOfColumns) {
+      if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) {
+        fillMappingDetails(column, parentToNonAggChildMapping);
+      }
+    }
+  }
+
+  private void fillMappingDetails(ColumnSchema column,
+      Map<String, Set<ColumnSchema>> map) {
+    List<ParentColumnTableRelation> parentColumnTableRelations =
+        column.getParentColumnTableRelations();
+    if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+      String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+      Set<ColumnSchema> columnSchemas = map.get(columnName);
+      if (null == columnSchemas) {
+        columnSchemas = new HashSet<>();
+        map.put(columnName, columnSchemas);
+      }
+      columnSchemas.add(column);
+    }
+  }
+
+  /**
+   * Below method will be used to fill parent to list of aggregation column mapping
+   * @param listOfColumns
+   *        list of child columns
+   */
+  private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+    parentToAggChildMapping = new HashMap<>();
+    for (ColumnSchema column : listOfColumns) {
+      if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+        fillMappingDetails(column, parentToAggChildMapping);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index ca0952d..0fd9fbf 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -126,6 +126,8 @@ public class CarbonTable implements Serializable {
 
   private int dimensionOrdinalMax;
 
+  private boolean hasDataMapSchema;
+
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -158,6 +160,8 @@ public class CarbonTable implements Serializable {
       table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
           tableInfo.getFactTable().getPartitionInfo());
     }
+    table.hasDataMapSchema =
+        null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
     return table;
   }
 
@@ -702,13 +706,13 @@ public class CarbonTable implements Serializable {
     this.dimensionOrdinalMax = dimensionOrdinalMax;
   }
 
-  public boolean isPreAggregateTable() {
-    return tableInfo.getParentRelationIdentifiers() != null && !tableInfo
-        .getParentRelationIdentifiers().isEmpty();
+
+  public boolean hasDataMapSchema() {
+    return hasDataMapSchema;
   }
 
-  public boolean hasPreAggregateTables() {
-    return tableInfo.getDataMapSchemaList() != null && !tableInfo
-        .getDataMapSchemaList().isEmpty();
+  public boolean isChildDataMap() {
+    return null != tableInfo.getParentRelationIdentifiers()
+        && !tableInfo.getParentRelationIdentifiers().isEmpty();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index e0632d9..5a9017b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -30,20 +30,20 @@ public class DataMapSchema implements Serializable, Writable {
 
   private static final long serialVersionUID = 6577149126264181553L;
 
-  private String dataMapName;
+  protected String dataMapName;
 
   private String className;
 
-  private RelationIdentifier relationIdentifier;
+  protected RelationIdentifier relationIdentifier;
   /**
    * child table schema
    */
-  private TableSchema childSchema;
+  protected TableSchema childSchema;
 
   /**
    * relation properties
    */
-  private Map<String, String> properties;
+  protected Map<String, String> properties;
 
   public DataMapSchema() {
   }
@@ -69,6 +69,10 @@ public class DataMapSchema implements Serializable, Writable {
     return properties;
   }
 
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
   public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
     this.relationIdentifier = relationIdentifier;
   }
@@ -81,10 +85,6 @@ public class DataMapSchema implements Serializable, Writable {
     this.properties = properties;
   }
 
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
   @Override public void write(DataOutput out) throws IOException {
     out.writeUTF(dataMapName);
     out.writeUTF(className);
@@ -114,7 +114,7 @@ public class DataMapSchema implements Serializable, Writable {
     this.className = in.readUTF();
     boolean isRelationIdnentifierExists = in.readBoolean();
     if (isRelationIdnentifierExists) {
-      this.relationIdentifier = new RelationIdentifier();
+      this.relationIdentifier = new RelationIdentifier(null, null, null);
       this.relationIdentifier.readFields(in);
     }
     boolean isChildSchemaExists = in.readBoolean();
@@ -130,6 +130,5 @@ public class DataMapSchema implements Serializable, Writable {
       String value = in.readUTF();
       this.properties.put(key, value);
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
new file mode 100644
index 0000000..5729959
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA;
+
+public class DataMapSchemaFactory {
+  public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory();
+
+  /**
+   * Below class will be used to get data map schema object
+   * based on class name
+   * @param className
+   * @return data map schema
+   */
+  public DataMapSchema getDataMapSchema(String dataMapName, String className) {
+    switch (className) {
+      case AGGREGATIONDATAMAPSCHEMA:
+        return new AggregationDataMapSchema(dataMapName, className);
+      default:
+        return new DataMapSchema(dataMapName, className);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 1d9e2ec..65878bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -295,7 +295,12 @@ public class TableInfo implements Serializable, Writable {
       for (int i = 0; i < numberOfChildTable; i++) {
         DataMapSchema childSchema = new DataMapSchema();
         childSchema.readFields(in);
-        dataMapSchemaList.add(childSchema);
+        DataMapSchema dataMapSchema = DataMapSchemaFactory.INSTANCE
+            .getDataMapSchema(childSchema.getDataMapName(), childSchema.getClassName());
+        dataMapSchema.setChildSchema(childSchema.getChildSchema());
+        dataMapSchema.setRelationIdentifier(childSchema.getRelationIdentifier());
+        dataMapSchema.setProperties(childSchema.getProperties());
+        dataMapSchemaList.add(dataMapSchema);
       }
     }
     boolean isParentTableRelationIndentifierExists = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 714e0d8..03848d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -263,9 +263,10 @@ public class TableSchema implements Serializable, Writable {
     Map<String, String> properties = new HashMap<>();
     properties.put("CHILD_SELECT QUERY", queryString);
     properties.put("QUERYTYPE", queryType);
-    DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
-    dataMapSchema.setChildSchema(this);
+    DataMapSchema dataMapSchema =
+        new DataMapSchema(dataMapName, className);
     dataMapSchema.setProperties(properties);
+    dataMapSchema.setChildSchema(this);
     dataMapSchema.setRelationIdentifier(relationIdentifier);
     return dataMapSchema;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
new file mode 100644
index 0000000..8b87a1a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Below class will be used to select the aggregate table based
+ * query plan. Rules for selecting the aggregate table is below:
+ * 1. Select all aggregate table based on projection
+ * 2. select aggregate table based on filter exp,
+ * 2. select if aggregate tables based on aggregate columns
+ */
+public class AggregateTableSelector {
+
+  /**
+   * current query plan
+   */
+  private QueryPlan queryPlan;
+
+  /**
+   * parent table
+   */
+  private CarbonTable parentTable;
+
+  public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) {
+    this.queryPlan = queryPlan;
+    this.parentTable = parentTable;
+  }
+
+  /**
+   * Below method will be used to select pre aggregate tables based on query plan
+   * Rules for selecting the aggregate table is below:
+   * 1. Select all aggregate table based on projection
+   * 2. select aggregate table based on filter exp,
+   * 2. select if aggregate tables based on aggregate columns
+   *
+   * @return selected pre aggregate table schema
+   */
+  public List<DataMapSchema> selectPreAggDataMapSchema() {
+    List<QueryColumn> projectionColumn = queryPlan.getProjectionColumn();
+    List<QueryColumn> aggColumns = queryPlan.getAggregationColumns();
+    List<QueryColumn> filterColumns = queryPlan.getFilterColumns();
+    List<DataMapSchema> dataMapSchemaList = parentTable.getTableInfo().getDataMapSchemaList();
+    List<DataMapSchema> selectedDataMapSchema = new ArrayList<>();
+    boolean isMatch;
+    // match projection columns
+    if (null != projectionColumn && !projectionColumn.isEmpty()) {
+      for (DataMapSchema dmSchema : dataMapSchemaList) {
+        AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+        isMatch = true;
+        for (QueryColumn queryColumn : projectionColumn) {
+          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+          if (null == columnSchemaByParentName) {
+            isMatch = false;
+          }
+        }
+        if (isMatch) {
+          selectedDataMapSchema.add(dmSchema);
+        }
+      }
+      // if projection column is present but selected table list size is zero then
+      if (selectedDataMapSchema.size() == 0) {
+        return selectedDataMapSchema;
+      }
+    }
+
+    // match filter columns
+    if (null != filterColumns && !filterColumns.isEmpty()) {
+      List<DataMapSchema> dmSchemaToIterate =
+          selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+      selectedDataMapSchema = new ArrayList<>();
+      for (DataMapSchema dmSchema : dmSchemaToIterate) {
+        isMatch = true;
+        for (QueryColumn queryColumn : filterColumns) {
+          AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+          if (null == columnSchemaByParentName) {
+            isMatch = false;
+          }
+        }
+        if (isMatch) {
+          selectedDataMapSchema.add(dmSchema);
+        }
+      }
+      // if filter column is present and selection size is zero then return
+      if (selectedDataMapSchema.size() == 0) {
+        return selectedDataMapSchema;
+      }
+    }
+    // match aggregation columns
+    if (null != aggColumns && !aggColumns.isEmpty()) {
+      List<DataMapSchema> dmSchemaToIterate =
+          selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+      selectedDataMapSchema = new ArrayList<>();
+      for (DataMapSchema dmSchema : dmSchemaToIterate) {
+        isMatch = true;
+        for (QueryColumn queryColumn : aggColumns) {
+          AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+          if (!aggregationDataMapSchema
+              .isColumnWithAggFunctionExists(queryColumn.getColumnSchema().getColumnName(),
+                  queryColumn.getAggFunction())) {
+            isMatch = false;
+          }
+        }
+        if (isMatch) {
+          selectedDataMapSchema.add(dmSchema);
+        }
+      }
+    }
+    return selectedDataMapSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
new file mode 100644
index 0000000..a62d556
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * column present in query
+ */
+public class QueryColumn {
+
+  /**
+   * parent column schema
+   */
+  private ColumnSchema columnSchema;
+
+  /**
+   * to store the change data type in case of cast
+   */
+  private String changedDataType;
+
+  /**
+   * aggregation function applied
+   */
+  private String aggFunction;
+
+  /**
+   * is filter column
+   */
+  private boolean isFilterColumn;
+
+  public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
+      boolean isFilterColumn) {
+    this.columnSchema = columnSchema;
+    this.changedDataType = changedDataType;
+    this.aggFunction = aggFunction;
+    this.isFilterColumn = isFilterColumn;
+  }
+
+  public ColumnSchema getColumnSchema() {
+    return columnSchema;
+  }
+
+  public String getChangedDataType() {
+    return changedDataType;
+  }
+
+  public String getAggFunction() {
+    return aggFunction;
+  }
+
+  public boolean isFilterColumn() {
+    return isFilterColumn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
new file mode 100644
index 0000000..21a34fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.List;
+
+/**
+ * class to maintain the query plan to select the data map tables
+ */
+public class QueryPlan {
+
+  /**
+   * List of projection columns
+   */
+  private List<QueryColumn> projectionColumn;
+
+  /**
+   * list of aggregation columns
+   */
+  private List<QueryColumn> aggregationColumns;
+
+  /**
+   * list of filter columns
+   */
+  private List<QueryColumn> filterColumns;
+
+  public QueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> aggregationColumns,
+      List<QueryColumn> filterColumns) {
+    this.projectionColumn = projectionColumn;
+    this.aggregationColumns = aggregationColumns;
+    this.filterColumns = filterColumns;
+  }
+
+  public List<QueryColumn> getProjectionColumn() {
+    return projectionColumn;
+  }
+
+  public List<QueryColumn> getAggregationColumns() {
+    return aggregationColumns;
+  }
+
+  public List<QueryColumn> getFilterColumns() {
+    return filterColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
new file mode 100644
index 0000000..6b435c6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaTable1regate
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("drop table if exists agg0")
+    sql("drop table if exists agg1")
+    sql("drop table if exists agg2")
+    sql("drop table if exists agg3")
+    sql("drop table if exists agg4")
+    sql("drop table if exists agg5")
+    sql("drop table if exists agg6")
+    sql("drop table if exists agg7")
+    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
+    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(id) from mainTable group by name")
+    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,count(id) from mainTable group by name")
+    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(age),count(id) from mainTable group by name")
+    sql("create datamap agg5 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name")
+    sql("create datamap agg6 on table mainTable using 'preaggregate' as select name,min(age) from mainTable group by name")
+    sql("create datamap agg7 on table mainTable using 'preaggregate' as select name,max(age) from mainTable group by name")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+  }
+
+
+  test("test PreAggregate table selection 1") {
+    val df = sql("select name from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 2") {
+    val df = sql("select name from mainTable where name in (select name from mainTable) group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+  }
+
+  test("test PreAggregate table selection 3") {
+    val df = sql("select name from mainTable where name in (select name from mainTable group by name) group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+  }
+
+  test("test PreAggregate table selection 4") {
+    val df = sql("select name from mainTable where name in('vishal') group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 5") {
+    val df = sql("select name, sum(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 6") {
+    val df = sql("select sum(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 7") {
+    val df = sql("select sum(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2")
+  }
+
+  test("test PreAggregate table selection 8") {
+    val df = sql("select count(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+  }
+
+  test("test PreAggregate table selection 9") {
+    val df = sql("select sum(age), count(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+  }
+
+  test("test PreAggregate table selection 10") {
+    val df = sql("select avg(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+  }
+
+  test("test PreAggregate table selection 11") {
+    val df = sql("select max(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+  }
+
+  test("test PreAggregate table selection 12") {
+    val df = sql("select min(age) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+  }
+
+  test("test PreAggregate table selection 13") {
+    val df = sql("select name, sum(age) from mainTable where city = 'Bangalore' group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+  }
+
+  test("test PreAggregate table selection 14") {
+    val df = sql("select sum(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 15") {
+    val df = sql("select avg(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+  }
+
+  test("test PreAggregate table selection 16") {
+    val df = sql("select max(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+  }
+
+  test("test PreAggregate table selection 17") {
+    val df = sql("select min(age) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+  }
+
+  test("test PreAggregate table selection 18") {
+    val df = sql("select count(id) from mainTable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+  }
+
+  def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+    var isValidPlan = false
+    plan.transform {
+      // first check if any preaTable1 scala function is applied it is present is in plan
+      // then call is from create preaTable1regate table class so no need to transform the query plan
+      case logicalRelation:LogicalRelation =>
+        if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+          if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+            isValidPlan = true
+          }
+        }
+        logicalRelation
+    }
+    if(!isValidPlan) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("drop table if exists agg0")
+    sql("drop table if exists agg1")
+    sql("drop table if exists agg2")
+    sql("drop table if exists agg3")
+    sql("drop table if exists agg4")
+    sql("drop table if exists agg5")
+    sql("drop table if exists agg6")
+    sql("drop table if exists agg7")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 42447da..e83d96a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -173,6 +173,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val DATAMAP = carbonKeyWord("DATAMAP")
   protected val ON = carbonKeyWord("ON")
   protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES")
+  protected val SELECT = carbonKeyWord("SELECT")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r
@@ -989,6 +990,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
     }
 
+  lazy val addPreAgg: Parser[String] =
+    SELECT ~> restInput <~ opt(";") ^^ {
+      case query =>
+        "select preAGG() as preAgg, " + query
+    }
+
   protected lazy val primitiveFieldType: Parser[Field] =
     primitiveTypes ^^ {
       case e1 =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9899be1..28dcbf2 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -463,7 +463,7 @@ object CarbonDataRDDFactory {
         throw new Exception(status(0)._2._2.errorMsg)
       }
       // if segment is empty then fail the data load
-      if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPreAggregateTable &&
+      if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
           !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
         // update the load entry in table status file for changing the status to failure
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index a37b55b..b69ef2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -52,6 +52,9 @@ class CarbonEnv {
 
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
+    // added for handling preaggregate table creation. when user will fire create ddl for
+    // create table we are adding a udf so no need to apply PreAggregate rules.
+    sparkSession.udf.register("preAgg", () => "")
     if (!initialized) {
       // update carbon session parameters , preserve thread parameters
       val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index ebf6273..3a78968 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
 /**
  * Below command class will be used to create pre-aggregate table
  * and updating the parent table about the child table information
@@ -47,9 +49,10 @@ case class CreatePreAggregateTableCommand(
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val df = sparkSession.sql(queryString)
-    val fieldRelationMap = PreAggregateUtil
-      .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+    val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+    val df = sparkSession.sql(updatedQuery)
+    val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+      df.logicalPlan, queryString)
     val fields = fieldRelationMap.keySet.toSeq
     val tableProperties = mutable.Map[String, String]()
     dmproperties.foreach(t => tableProperties.put(t._1, t._2))
@@ -87,7 +90,8 @@ case class CreatePreAggregateTableCommand(
       val tableInfo = relation.tableMeta.carbonTable.getTableInfo
       // child schema object which will be updated on parent table about the
       val childSchema = tableInfo.getFactTable.buildChildSchema(
-        dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+        dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+        tableInfo.getDatabaseName, queryString, "AGGREGATION")
       dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
       // updating the parent table about child table
       PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 8271e57..7a66e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -105,7 +105,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
           }
       }
 
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(s"Cannot change data type for columns in " +
                                                 s"pre-aggreagate table ${
                                                   carbonTable.getDatabaseName
@@ -126,12 +126,12 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener
     val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
     val carbonTable = deleteSegmentByDatePreEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on tables which have a pre-aggregate table. " +
           "Drop pre-aggregation table to continue")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on pre-aggregate table")
       }
@@ -150,11 +150,11 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on tables which have a pre-aggregate table")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on pre-aggregate table")
       }
@@ -190,7 +190,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
               s"pre-aggregate table ${ dataMapSchema.getRelationIdentifier.toString}")
           }
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
           carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
       }
@@ -209,11 +209,11 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
       operationContext: OperationContext): Unit = {
     val renameTablePostListener = event.asInstanceOf[AlterTableRenamePreEvent]
     val carbonTable = renameTablePostListener.carbonTable
-    if (carbonTable.isPreAggregateTable) {
+    if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
         "Rename operation for pre-aggregate table is not supported.")
     }
-    if (carbonTable.hasPreAggregateTables) {
+    if (carbonTable.hasDataMapSchema) {
       throw new UnsupportedOperationException(
         "Rename operation is not supported for table with pre-aggregate tables")
     }
@@ -231,12 +231,12 @@ object UpdatePreAggregatePreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Update operation is not supported for tables which have a pre-aggregate table. Drop " +
           "pre-aggregate tables to continue.")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Update operation is not supported for pre-aggregate table")
       }
@@ -255,12 +255,12 @@ object DeletePreAggregatePreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasPreAggregateTables) {
+      if (carbonTable.hasDataMapSchema) {
         throw new UnsupportedOperationException(
           "Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
           "pre-aggregate tables to continue.")
       }
-      if (carbonTable.isPreAggregateTable) {
+      if (carbonTable.isChildDataMap) {
         throw new UnsupportedOperationException(
           "Delete operation is not supported for pre-aggregate table")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index b926705..62e7623 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,13 +16,14 @@
  */
 package org.apache.spark.sql.execution.command.preaaggregate
 
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
@@ -51,9 +52,14 @@ object PreAggregateUtil {
 
   def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
     plan match {
-      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+      case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+          carbonRelation.metaData.carbonTable
+      case Aggregate(_, _, logicalRelation: LogicalRelation)
+        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+          carbonRelation.metaData.carbonTable
       case _ => throw new MalformedCarbonCommandException("table does not exist")
     }
   }
@@ -67,54 +73,86 @@ object PreAggregateUtil {
    * @param selectStmt
    * @return list of fields
    */
-  def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+  def validateActualSelectPlanAndGetAttributes(plan: LogicalPlan,
       selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
-    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
     plan match {
-      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
-          .metaData.carbonTable
-        val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-          .getTableName
-        val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-          .getDatabaseName
-        val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-          .getTableId
-        if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+      case Aggregate(groupByExp, aggExp, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) =>
+        getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+      case Aggregate(groupByExp, aggExp, logicalRelation: LogicalRelation) =>
+        getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+    }
+  }
+
+  /**
+   * Below method will be used to get the fields from expressions
+   * @param groupByExp
+   *                  grouping expression
+   * @param aggExp
+   *               aggregate expression
+   * @param logicalRelation
+   *                        logical relation
+   * @param selectStmt
+   *                   select statement
+   * @return fields from expressions
+   */
+  def getFieldsFromPlan(groupByExp: Seq[Expression],
+      aggExp: Seq[NamedExpression], logicalRelation: LogicalRelation, selectStmt: String):
+  scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    if (!logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+      throw new MalformedCarbonCommandException("Un-supported table")
+    }
+    val carbonTable = logicalRelation.relation.
+      asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+      .metaData.carbonTable
+    val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getTableName
+    val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getDatabaseName
+    val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+      .getTableId
+    if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+      throw new MalformedCarbonCommandException(
+        "Pre Aggregation is not supported on Pre-Aggregated Table")
+    }
+    groupByExp.map {
+      case attr: AttributeReference =>
+        fieldToDataMapFieldMap += getField(attr.name,
+          attr.dataType,
+          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName = parentTableName,
+          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      case _ =>
+        throw new MalformedCarbonCommandException(s"Unsupported Function in select Statement:${
+          selectStmt } ")
+    }
+    aggExp.map {
+      case Alias(attr: AggregateExpression, _) =>
+        if (attr.isDistinct) {
           throw new MalformedCarbonCommandException(
-            "Pre Aggregation is not supported on Pre-Aggregated Table")
-        }
-        aExp.map {
-          case Alias(attr: AggregateExpression, _) =>
-            if (attr.isDistinct) {
-              throw new MalformedCarbonCommandException(
-                "Distinct is not supported On Pre Aggregation")
-            }
-            fieldToDataMapFieldMap ++= (validateAggregateFunctionAndGetFields(carbonTable,
-              attr.aggregateFunction,
-              parentTableName,
-              parentDatabaseName,
-              parentTableId))
-          case attr: AttributeReference =>
-            fieldToDataMapFieldMap += getField(attr.name,
-              attr.dataType,
-              parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-              parentTableName = parentTableName,
-              parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-          case Alias(attr: AttributeReference, _) =>
-            fieldToDataMapFieldMap += getField(attr.name,
-              attr.dataType,
-              parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-              parentTableName = parentTableName,
-              parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-          case _ =>
-            throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
-              selectStmt } ")
+            "Distinct is not supported On Pre Aggregation")
         }
-        Some(carbonTable)
+        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
+          attr.aggregateFunction,
+          parentTableName,
+          parentDatabaseName,
+          parentTableId)
+      case attr: AttributeReference =>
+        fieldToDataMapFieldMap += getField(attr.name,
+          attr.dataType,
+          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName = parentTableName,
+          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      case Alias(attr: AttributeReference, _) =>
+        fieldToDataMapFieldMap += getField(attr.name,
+          attr.dataType,
+          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName = parentTableName,
+          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") =>
       case _ =>
-        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
+        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
+          selectStmt } ")
     }
     fieldToDataMapFieldMap
   }
@@ -347,30 +385,6 @@ object PreAggregateUtil {
   }
 
   /**
-   * This method will split schema string into multiple parts of configured size and
-   * registers the parts as keys in tableProperties which will be read by spark to prepare
-   * Carbon Table fields
-   *
-   * @param sparkConf
-   * @param schemaJsonString
-   * @return
-   */
-  private def prepareSchemaJson(sparkConf: SparkConf,
-      schemaJsonString: String): String = {
-    val threshold = sparkConf
-      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
-        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
-    // Split the JSON string.
-    val parts = schemaJsonString.grouped(threshold).toSeq
-    var schemaParts: Seq[String] = Seq.empty
-    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
-    parts.zipWithIndex.foreach { case (part, index) =>
-      schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
-    }
-    schemaParts.mkString(",")
-  }
-
-  /**
    * Validates that the table exists and acquires meta lock on it.
    *
    * @param dbName
@@ -453,4 +467,30 @@ object PreAggregateUtil {
   def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
     SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).nonEmpty
   }
+
+  /**
+   * Below method will be used to update logical plan
+   * this is required for creating pre aggregate tables,
+   * so @CarbonPreAggregateRules will not be applied during creation
+   * @param logicalPlan
+   *                    actual logical plan
+   * @return updated plan
+   */
+  def updatePreAggQueyPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+    val updatedPlan = logicalPlan.transform {
+      case _@Project(projectList, child) =>
+        val buffer = new ArrayBuffer[NamedExpression]()
+        buffer ++= projectList
+        buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+          Seq.empty, isDistinct = false), "preAgg")())
+        Project(buffer, child)
+      case Aggregate(groupByExp, aggExp, l: UnresolvedRelation) =>
+        val buffer = new ArrayBuffer[NamedExpression]()
+        buffer ++= aggExp
+        buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+          Seq.empty, isDistinct = false), "preAgg")())
+        Aggregate(groupByExp, buffer, l)
+    }
+    updatedPlan
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index ba7e1eb..7bd0fad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,111 +20,12 @@ package org.apache.spark.sql.hive
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, ExprId, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, _}
+import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
-        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
-    }
-  }
-
-  def castChildOutput(p: InsertIntoTable,
-      relation: CarbonDatasourceHadoopRelation,
-      child: LogicalPlan)
-  : LogicalPlan = {
-    if (relation.carbonRelation.output.size > CarbonCommonConstants
-      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
-      sys
-        .error("Maximum supported column by carbon is:" + CarbonCommonConstants
-          .DEFAULT_MAX_NUMBER_OF_COLUMNS
-        )
-    }
-    val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
-      .getParentRelationIdentifiers.isEmpty
-    // transform logical plan if the load is for aggregate table.
-    val childPlan = if (isAggregateTable) {
-      transformAggregatePlan(child)
-    } else {
-      child
-    }
-    if (childPlan.output.size >= relation.carbonRelation.output.size) {
-      val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
-        columnWithIndex._1 match {
-          case attr: Alias =>
-            Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
-          case attr: Attribute =>
-            Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
-          case attr => attr
-        }
-      }
-      val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
-        p.child
-      } else {
-        Project(newChildOutput, childPlan)
-      }
-      InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
-    } else {
-      sys.error("Cannot insert into target table because column number are different")
-    }
-  }
-
-  /**
-   * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
-   *
-   * @param logicalPlan
-   * @return
-   */
-  private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
-    logicalPlan transform {
-      case aggregate@Aggregate(_, aExp, _) =>
-        val newExpressions = aExp flatMap {
-          case alias@Alias(attrExpression: AggregateExpression, _) =>
-            attrExpression.aggregateFunction flatMap {
-              case Average(attr: AttributeReference) =>
-                Seq(Alias(attrExpression
-                  .copy(aggregateFunction = Sum(attr.withName(attr.name)),
-                    resultId = NamedExpression.newExprId),
-                  attr.name)(),
-                  Alias(attrExpression
-                    .copy(aggregateFunction = Count(attr.withName(attr.name)),
-                      resultId = NamedExpression.newExprId), attr.name)())
-              case Average(Cast(attr: AttributeReference, _)) =>
-                Seq(Alias(attrExpression
-                  .copy(aggregateFunction = Sum(attr.withName(attr.name)),
-                    resultId = NamedExpression.newExprId),
-                  attr.name)(),
-                  Alias(attrExpression
-                    .copy(aggregateFunction = Count(attr.withName(attr.name)),
-                      resultId = NamedExpression.newExprId), attr.name)())
-              case _: DeclarativeAggregate => Seq(alias)
-              case _ => Nil
-            }
-          case namedExpr: NamedExpression => Seq(namedExpr)
-        }
-        aggregate.copy(aggregateExpressions = newExpressions)
-      case plan: LogicalPlan => plan
-    }
-  }
-}
 
 case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {