You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/27 16:35:29 UTC

carbondata git commit: [CARBONDATA-1517]- Pre Aggregate Create Table Support

Repository: carbondata
Updated Branches:
  refs/heads/pre-aggregate 334aa1ccd -> 33d11997a


[CARBONDATA-1517]- Pre Aggregate Create Table Support

Support CTAS in carbon and support creating aggregation tables using CTAS and update aggregation table information to main table schema.

This closes #1433


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/33d11997
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/33d11997
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/33d11997

Branch: refs/heads/pre-aggregate
Commit: 33d11997abe1250b55c7e23e632ea44d31636956
Parents: 334aa1c
Author: kumarvishal <ku...@gmail.com>
Authored: Sun Oct 15 18:05:55 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Oct 27 22:04:54 2017 +0530

----------------------------------------------------------------------
 .../table/column/ParentColumnTableRelation.java |  71 +++
 .../ThriftWrapperSchemaConverterImplTest.java   |  28 +-
 .../preaggregate/TestPreAggCreateCommand.scala  | 148 +++++++
 .../carbondata/spark/util/CommonUtil.scala      |   9 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  12 +-
 .../command/carbonTableSchemaCommon.scala       | 122 ++++--
 .../command/management/LoadTableCommand.scala   |   2 +-
 .../CreatePreAggregateTableCommand.scala        | 136 ++++++
 .../preaaggregate/PreAggregateUtil.scala        | 431 +++++++++++++++++++
 .../schema/AlterTableRenameTableCommand.scala   |   2 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  41 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  72 +++-
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  21 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  37 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  10 +-
 16 files changed, 1066 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
new file mode 100644
index 0000000..425d0f2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * To maintain the relation of child column to parent table column
+ */
+public class ParentColumnTableRelation implements Serializable, Writable {
+
+  private RelationIdentifier relationIdentifier;
+  /**
+   * parent column id
+   */
+  private String columnId;
+
+  private String columnName;
+
+  public ParentColumnTableRelation(RelationIdentifier relationIdentifier, String columId,
+      String columnName) {
+    this.relationIdentifier = relationIdentifier;
+    this.columnId = columId;
+    this.columnName = columnName;
+  }
+
+  public RelationIdentifier getRelationIdentifier() {
+    return relationIdentifier;
+  }
+
+  public String getColumnId() {
+    return columnId;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    relationIdentifier.write(out);
+    out.writeUTF(columnId);
+    out.writeUTF(columnName);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    this.relationIdentifier = new RelationIdentifier(null, null, null);
+    relationIdentifier.readFields(in);
+    this.columnId = in.readUTF();
+    this.columnName = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 4a3ef32..f05e4ac 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.format.DataMapSchema;
 
 import mockit.Mock;
 import mockit.MockUp;
@@ -82,6 +83,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     thriftColumnSchemas = new ArrayList<org.apache.carbondata.format.ColumnSchema>();
     thriftColumnSchemas.add(thriftColumnSchema);
     thriftSchemaEvolutionEntries = new ArrayList<>();
@@ -419,6 +421,7 @@ public class ThriftWrapperSchemaConverterImplTest {
             new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
                     "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
 
     new MockUp<ColumnSchema>() {
@@ -481,6 +484,8 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public String getColumnReferenceId() {
         return "1";
       }
+
+      @Mock public String getAggFunction() {return "" ;}
     };
 
     org.apache.carbondata.format.ColumnSchema actualResult =
@@ -494,7 +499,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -569,7 +574,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.INT,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -643,6 +648,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.SHORT,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -716,7 +722,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.LONG,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -790,6 +796,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DOUBLE,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -864,6 +871,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DECIMAL,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -924,6 +932,10 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public String getColumnReferenceId() {
         return "1";
       }
+
+      @Mock public String getAggFunction() {
+        return "";
+      }
     };
 
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
@@ -938,6 +950,7 @@ public class ThriftWrapperSchemaConverterImplTest {
             org.apache.carbondata.format.DataType.TIMESTAMP, "columnName", "1", true, encoders,
             true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -1012,7 +1025,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.ARRAY,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -1086,6 +1099,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRUCT,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -1161,6 +1175,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(null, "columnName", "1", true, encoders,
             true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -1311,7 +1326,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
     org.apache.carbondata.format.ColumnSchema actualResult =
         thriftWrapperSchemaConverter.fromWrapperToExternalColumnSchema(wrapperColumnSchema);
@@ -1499,6 +1514,8 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public String getColumnReferenceId() {
         return "1";
       }
+
+      @Mock public String getAggFunction() { return "";}
     };
 
     new MockUp<TableInfo>() {
@@ -1535,6 +1552,7 @@ public class ThriftWrapperSchemaConverterImplTest {
     org.apache.carbondata.format.TableInfo expectedResult =
         new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
             .carbondata.format.TableSchema>());
+    expectedResult.setDataMapSchemas(new ArrayList<DataMapSchema>());
     assertEquals(expectedResult, actualResult);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
new file mode 100644
index 0000000..6120e88
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -0,0 +1,148 @@
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists PreAggMain")
+    sql("drop table if exists PreAggMain1")
+    sql("drop table if exists PreAggMain2")
+    sql("create table preaggMain (a string, b string, c string) stored by 'carbondata'")
+    sql("create table preaggMain1 (a string, b string, c string) stored by 'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
+    sql("create table preaggMain2 (a string, b string, c string) stored by 'carbondata'")
+  }
+
+
+  test("test pre agg create table One") {
+    sql("create table preagg1 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a,sum(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_b_sum")
+    sql("drop table preagg1")
+  }
+
+  test("test pre agg create table Two") {
+    sql("create table preagg2 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a as a1,sum(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_b_sum")
+    sql("drop table preagg2")
+  }
+
+  test("test pre agg create table Three") {
+    sql("create table preagg3 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a,sum(b) as sum from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_b_sum")
+    sql("drop table preagg3")
+  }
+
+  test("test pre agg create table four") {
+    sql("create table preagg4 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a as a1,sum(b) as sum from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_b_sum")
+    sql("drop table preagg4")
+  }
+
+
+  test("test pre agg create table five") {
+    sql("create table preagg11 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a,sum(b) from PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "DICTIONARY")
+    sql("drop table preagg11")
+  }
+
+  test("test pre agg create table six") {
+    sql("create table preagg12 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) from PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "DICTIONARY")
+    sql("drop table preagg12")
+  }
+
+  test("test pre agg create table seven") {
+    sql("create table preagg13 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a,sum(b) as sum from PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "DICTIONARY")
+    sql("drop table preagg13")
+  }
+
+  test("test pre agg create table eight") {
+    sql("create table preagg14 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) as sum from PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "DICTIONARY")
+    sql("drop table preagg14")
+  }
+
+
+  test("test pre agg create table nine") {
+    sql("create table preagg15 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a,avg(b) from PreAggMain2 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_count")
+    sql("drop table preagg15")
+  }
+
+  test("test pre agg create table ten") {
+    sql("create table preagg16 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,max(b) from PreAggMain2 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_b_max")
+    sql("drop table preagg16")
+  }
+
+  test("test pre agg create table eleven") {
+    sql("create table preagg17 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a,min(b) from PreAggMain2 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_b_min")
+    sql("drop table preagg17")
+  }
+
+  test("test pre agg create table twelve") {
+    sql("create table preagg18 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,count(b) from PreAggMain2 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_b_count")
+    sql("drop table preagg18")
+  }
+
+  test("test pre agg create table thirteen") {
+    try {
+      sql(
+        "create table preagg19 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,count(distinct b) from PreAggMain2 group by a")
+      assert(false)
+    } catch {
+      case _: Exception =>
+        assert(true)
+    }
+  }
+
+  test("test pre agg create table fourteen") {
+    try {
+      sql(
+        "create table preagg20 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,sum(distinct b) from PreAggMain2 group by a")
+      assert(false)
+    } catch {
+      case _: Exception =>
+        assert(true)
+    }
+  }
+
+  test("test pre agg create table fifteen") {
+    try {
+      sql(
+        "create table preagg21 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,sum(b) from PreAggMain2 where a='vishal' group by a")
+      assert(false)
+    } catch {
+      case _: Exception =>
+        assert(true)
+    }
+  }
+
+
+  override def afterAll {
+    sql("drop table if exists PreAggMain")
+    sql("drop table if exists PreAggMain1")
+    sql("drop table if exists PreAggMain2")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index bc24c12..3d06a05 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.util
 
 import java.text.SimpleDateFormat
 import java.util
+import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
@@ -831,4 +832,12 @@ object CommonUtil {
         LOGGER.error(s)
     }
   }
+
+  def getScaleAndPrecision(dataType: String): (Int, Int) = {
+    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+    m.find()
+    val matchedString: String = m.group(1)
+    val scaleAndPrecision = matchedString.split(",")
+    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4649082..5d389e8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -222,16 +222,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     dimensions ++ complexDimensions
   }
 
-
-
-  def getScaleAndPrecision(dataType: String): (Int, Int) = {
-    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
-    m.find()
-    val matchedString: String = m.group(1)
-    val scaleAndPrecision = matchedString.split(",")
-    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
-  }
-
   /**
    * This will prepate the Model from the Tree details.
    *
@@ -1069,7 +1059,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       // if it is present then extracting the precision and scale. resetting the data type
       // with Decimal.
       case _ if dataType.startsWith("decimal") =>
-        val (precision, scale) = getScaleAndPrecision(dataType)
+        val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType)
         Field(field.column,
           Some("Decimal"),
           field.name,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e5cfc84..c985017 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -33,8 +33,8 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
 import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
@@ -60,13 +60,30 @@ case class TableModel(
     colProps: Option[util.Map[String,
     util.List[ColumnProperty]]] = None,
     bucketFields: Option[BucketFields],
-    partitionInfo: Option[PartitionInfo])
+    partitionInfo: Option[PartitionInfo],
+    var parentTable: Option[CarbonTable] = None,
+    var dataMapRelation: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]] = None)
 
 case class Field(column: String, var dataType: Option[String], name: Option[String],
     children: Option[List[Field]], parent: String = null,
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
-    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "")
+    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") {
+  override def equals(o: Any) : Boolean = o match {
+    case that: Field =>
+      that.column.equalsIgnoreCase(this.column)
+    case _ => false
+  }
+  override def hashCode : Int = column.hashCode
+}
+
+case class DataMapField(aggregateFunction: String = "",
+    columnTableRelation: Option[ColumnTableRelation] = None) {
+}
+
+case class ColumnTableRelation(parentColumnName: String, parentColumnId: String,
+    parentTableName: String, parentDatabaseName: String, parentTableId: String) {
+}
 
 case class ColumnProperty(key: String, value: String)
 
@@ -351,11 +368,10 @@ class TableNewProcessor(cm: TableModel) {
       fields.foreach(field => {
         val encoders = new java.util.ArrayList[Encoding]()
         encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = getColumnSchema(
+        val columnSchema = getColumnSchema(
           DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
           field.name.getOrElse(field.column), index,
-          isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale,
-          field.schemaOrdinal)
+          isCol = true, encoders, isDimensionCol = true, rowGroup, field, cm.dataMapRelation)
         allColumns ++= Seq(columnSchema)
         index = index + 1
         rowGroup = rowGroup + 1
@@ -370,19 +386,24 @@ class TableNewProcessor(cm: TableModel) {
 
   def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
       encoders: java.util.List[Encoding], isDimensionCol: Boolean,
-      colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
+      colGroup: Integer, field: Field,
+      map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]]
+      ) : ColumnSchema = {
     val columnSchema = new ColumnSchema()
     columnSchema.setDataType(dataType)
     columnSchema.setColumnName(colName)
-    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
-    if (highCardinalityDims.contains(colName)) {
-      encoders.remove(Encoding.DICTIONARY)
-    }
+    val isParentColumnRelation = map.isDefined && map.get.get(field).isDefined
+    if(!isParentColumnRelation) {
+      val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+      if (highCardinalityDims.contains(colName)) {
+        encoders.remove(Encoding.DICTIONARY)
+      }
     if (dataType == DataTypes.DATE) {
-      encoders.add(Encoding.DIRECT_DICTIONARY)
-    }
+        encoders.add(Encoding.DIRECT_DICTIONARY)
+      }
     if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) {
-      encoders.add(Encoding.DIRECT_DICTIONARY)
+        encoders.add(Encoding.DIRECT_DICTIONARY)
+      }
     }
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
@@ -393,10 +414,22 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setColumnar(isCol)
     columnSchema.setDimensionColumn(isDimensionCol)
     columnSchema.setColumnGroup(colGroup)
-    columnSchema.setPrecision(precision)
-    columnSchema.setScale(scale)
-    columnSchema.setSchemaOrdinal(schemaOrdinal)
+    columnSchema.setPrecision(field.precision)
+    columnSchema.setScale(field.scale)
+    columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
+    if(isParentColumnRelation) {
+      val dataMapField = map.get.get(field).get
+      columnSchema.setAggFunction(dataMapField.aggregateFunction);
+        val relation = dataMapField.columnTableRelation.get
+        val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
+        val relationIdentifier = new RelationIdentifier(
+          relation.parentDatabaseName, relation.parentTableName, relation.parentTableId)
+        val parentColumnTableRelation = new ParentColumnTableRelation(
+          relationIdentifier, relation.parentColumnId, relation.parentColumnName)
+        parentColumnTableRelationList.add(parentColumnTableRelation)
+        columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
+    }
     // TODO: Need to fill RowGroupID, converted type
     // & Number of Children after DDL finalization
     columnSchema
@@ -412,9 +445,16 @@ class TableNewProcessor(cm: TableModel) {
     // Sort columns should be at the begin of all columns
     cm.sortKeyDims.get.foreach { keyDim =>
       val field = cm.dimCols.find(keyDim equals _.column).get
-      val encoders = new java.util.ArrayList[Encoding]()
-      encoders.add(Encoding.DICTIONARY)
-      val columnSchema: ColumnSchema = getColumnSchema(
+      val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+        cm.parentTable.get.getColumnByName(
+          cm.parentTable.get.getFactTableName,
+          cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
+      } else {
+        val encoders = new java.util.ArrayList[Encoding]()
+        encoders.add(Encoding.DICTIONARY)
+        encoders
+      }
+      val columnSchema = getColumnSchema(
         DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
         field.name.getOrElse(field.column),
         index,
@@ -422,9 +462,8 @@ class TableNewProcessor(cm: TableModel) {
         encoders,
         isDimensionCol = true,
         -1,
-        field.precision,
-        field.scale,
-        field.schemaOrdinal)
+        field,
+        cm.dataMapRelation)
       columnSchema.setSortColumn(true)
       allColumns :+= columnSchema
       index = index + 1
@@ -433,9 +472,18 @@ class TableNewProcessor(cm: TableModel) {
     cm.dimCols.foreach(field => {
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
-        val encoders = new java.util.ArrayList[Encoding]()
-        encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = getColumnSchema(
+        val encoders = if (cm.parentTable.isDefined &&
+                           cm.dataMapRelation.get.get(field).isDefined) {
+          cm.parentTable.get.getColumnByName(
+            cm.parentTable.get.getFactTableName,
+            cm.dataMapRelation.get.get(field).get.
+              columnTableRelation.get.parentColumnName).getEncoder
+        } else {
+          val encoders = new java.util.ArrayList[Encoding]()
+          encoders.add(Encoding.DICTIONARY)
+          encoders
+        }
+        val columnSchema = getColumnSchema(
           DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
           field.name.getOrElse(field.column),
           index,
@@ -443,9 +491,8 @@ class TableNewProcessor(cm: TableModel) {
           encoders,
           isDimensionCol = true,
           -1,
-          field.precision,
-          field.scale,
-          field.schemaOrdinal)
+          field,
+          cm.dataMapRelation)
         allColumns :+= columnSchema
         index = index + 1
         if (field.children.isDefined && field.children.get != null) {
@@ -457,7 +504,7 @@ class TableNewProcessor(cm: TableModel) {
 
     cm.msrCols.foreach(field => {
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(
+      val columnSchema = getColumnSchema(
         DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
         field.name.getOrElse(field.column),
         index,
@@ -465,9 +512,8 @@ class TableNewProcessor(cm: TableModel) {
         encoders,
         isDimensionCol = false,
         -1,
-        field.precision,
-        field.scale,
-        field.schemaOrdinal)
+        field,
+        cm.dataMapRelation)
       allColumns :+= columnSchema
       index = index + 1
       measureCount += 1
@@ -508,13 +554,16 @@ class TableNewProcessor(cm: TableModel) {
     // Adding dummy measure if no measure is provided
     if (measureCount == 0) {
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(DataTypes.DOUBLE,
+      val field = Field(column = CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
+        dataType = None, name = None, children = None)
+      val columnSchema: ColumnSchema =
+        getColumnSchema(DataTypes.DOUBLE,
         CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
         index,
         true,
         encoders,
         false,
-        -1, 0, 0, schemaOrdinal = -1)
+        -1, field, cm.dataMapRelation)
       columnSchema.setInvisible(true)
       allColumns :+= columnSchema
     }
@@ -523,6 +572,7 @@ class TableNewProcessor(cm: TableModel) {
 
     val tableInfo = new TableInfo()
     val tableSchema = new TableSchema()
+
     val schemaEvol = new SchemaEvolution()
     schemaEvol.setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
     tableSchema.setTableId(UUID.randomUUID().toString)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 9018f7b..35e747b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -504,7 +504,7 @@ case class LoadTableCommand(
     entry.setTime_stamp(System.currentTimeMillis())
 
     // write TableInfo
-    metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
+    metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier,
       carbonTablePath.getCarbonTableIdentifier,
       tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
new file mode 100644
index 0000000..ca384f9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{RelationIdentifier, TableInfo}
+import org.apache.carbondata.core.util.CarbonUtil
+
+/**
+ * Below command class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * Failure case:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ * @param cm
+ * @param dataFrame
+ * @param createDSTable
+ * @param queryString
+ */
+case class CreatePreAggregateTableCommand(
+    cm: TableModel,
+    dataFrame: DataFrame,
+    createDSTable: Boolean = true,
+    queryString: String,
+    fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+  extends RunnableCommand with SchemaProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(storePath)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
+    val tbName = cm.tableName
+    val dbName = cm.databaseName
+    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+    // getting the parent table
+    val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+    // getting the table name
+    val parentTableName = parentTable.getFactTableName
+    // getting the db name of parent table
+    val parentDbName = parentTable.getDatabaseName
+    // updating the relation identifier, this will be stored in child table
+    // which can be used during dropping of pre-aggreate table as parent table will
+    // also get updated
+    cm.parentTable = Some(parentTable)
+    cm.dataMapRelation = Some(fieldRelationMap)
+    val tableInfo: TableInfo = TableNewProcessor(cm)
+    // Add validation for sort scope when create table
+    val sortScope = tableInfo.getFactTable.getTableProperties
+      .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    if (!CarbonUtil.isValidSortOption(sortScope)) {
+      throw new InvalidConfigurationException(
+        s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
+        s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+    }
+
+    if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
+      sys.error("No Dimensions found. Table should have at least one dimesnion !")
+    }
+
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tbName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+          s"Table [$tbName] already exists under database [$dbName]")
+        sys.error(s"Table [$tbName] already exists under database [$dbName]")
+      }
+    } else {
+      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
+      // Add Database to catalog and persist
+      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val tablePath = tableIdentifier.getTablePath
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
+      if (createDSTable) {
+        try {
+          val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
+          cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
+          cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+          sparkSession.sql(
+            s"""CREATE TABLE $dbName.$tbName
+               |(${ fields.map(f => f.rawSchema).mkString(",") })
+               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+            s""""$tablePath"$carbonSchemaString) """)
+          // child schema object which will be updated on parent table about the
+          val childSchema = tableInfo.getFactTable
+            .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+          // upadting the parent table about child table
+          PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+        } catch {
+          case e: Exception =>
+            val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+            // call the drop table to delete the created table.
+            CarbonEnv.getInstance(sparkSession).carbonMetastore
+              .dropTable(tablePath, identifier)(sparkSession)
+            LOGGER.audit(s"Table creation with Database name [$dbName] " +
+                         s"and Table name [$tbName] failed")
+            throw e
+        }
+      }
+
+      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+    }
+    Seq.empty
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
new file mode 100644
index 0000000..c4b6783
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility class for keeping all the utility method for pre-aggregate
+ */
+object PreAggregateUtil {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
+    plan match {
+      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+      case _ => throw new MalformedCarbonCommandException("table does not exist")
+    }
+  }
+
+  /**
+   * Below method will be used to validate the select plan
+   * and get the required fields from select plan
+   * Currently only aggregate query is support any other type of query will
+   * fail
+   * @param plan
+   * @param selectStmt
+   * @return list of fields
+   */
+  def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+      selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+    val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    plan match {
+      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+          .metaData.carbonTable
+        val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+          .getTableName
+        val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+          .getDatabaseName
+        val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+          .getTableId
+        if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+          throw new MalformedCarbonCommandException(
+            "Pre Aggregation is not supported on Pre-Aggregated Table")
+        }
+        aExp.map {
+          case Alias(attr: AggregateExpression, _) =>
+            if (attr.isDistinct) {
+              throw new MalformedCarbonCommandException(
+                "Distinct is not supported On Pre Aggregation")
+            }
+            fieldToDataMapFieldMap ++= ((validateAggregateFunctionAndGetFields(carbonTable,
+              attr.aggregateFunction,
+              parentTableName,
+              parentDatabaseName,
+              parentTableId)))
+          case attr: AttributeReference =>
+            fieldToDataMapFieldMap += getField(attr.name,
+              attr.dataType,
+              parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+              parentTableName = parentTableName,
+              parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+          case Alias(attr: AttributeReference, _) =>
+            fieldToDataMapFieldMap += getField(attr.name,
+              attr.dataType,
+              parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+              parentTableName = parentTableName,
+              parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+          case _ =>
+            throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
+              selectStmt } ")
+        }
+        Some(carbonTable)
+      case _ =>
+        throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
+    }
+    fieldToDataMapFieldMap
+  }
+
+  /**
+   * Below method will be used to validate about the aggregate function
+   * which is applied on select query.
+   * Currently sum, max, min, count, avg is supported
+   * in case of any other aggregate function it will throw error
+   * In case of avg it will return two fields one for count
+   * and other of sum of that column to support rollup
+   * @param carbonTable
+   * @param aggFunctions
+   * @param parentTableName
+   * @param parentDatabaseName
+   * @param parentTableId
+   * @return list of fields
+   */
+  def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
+      aggFunctions: AggregateFunction,
+      parentTableName: String,
+      parentDatabaseName: String,
+      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
+    val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
+    aggFunctions match {
+      case sum@Sum(attr: AttributeReference) =>
+        list += getField(attr.name,
+          attr.dataType,
+          sum.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          sum.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case count@Count(Seq(attr: AttributeReference)) =>
+        list += getField(attr.name,
+          attr.dataType,
+          count.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case min@Min(attr: AttributeReference) =>
+        list += getField(attr.name,
+          attr.dataType,
+          min.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          min.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case max@Max(attr: AttributeReference) =>
+        list += getField(attr.name,
+          attr.dataType,
+          max.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          max.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case Average(attr: AttributeReference) =>
+        getField(attr.name,
+          attr.dataType,
+          "sum",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+        list += getField(attr.name,
+          attr.dataType,
+          "count",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          "sum",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+        list += getField(attr.name,
+          changeDataType,
+          "count",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case _ =>
+        throw new MalformedCarbonCommandException("Un-Supported Aggregation Type")
+    }
+  }
+
+  /**
+   * Below method will be used to get the fields object for pre aggregate table
+   * @param columnName
+   * @param dataType
+   * @param aggregateType
+   * @param parentColumnId
+   * @param parentTableName
+   * @param parentDatabaseName
+   * @param parentTableId
+   * @return fields object
+   */
+  def getField(columnName: String,
+      dataType: DataType,
+      aggregateType: String = "",
+      parentColumnId: String,
+      parentTableName: String,
+      parentDatabaseName: String,
+      parentTableId: String): (Field, DataMapField) = {
+    val actualColumnName = if (aggregateType.equals("")) {
+      parentTableName + '_' + columnName
+    } else {
+      parentTableName + '_' + columnName + '_' + aggregateType
+    }
+    val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
+    val columnTableRelation = ColumnTableRelation(parentColumnName = columnName,
+      parentColumnId = parentColumnId,
+      parentTableName = parentTableName,
+      parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+    val dataMapField = DataMapField(aggregateType, Some(columnTableRelation))
+    if (dataType.typeName.startsWith("decimal")) {
+      val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString)
+      (Field(column = actualColumnName,
+        dataType = Some(dataType.typeName),
+        name = Some(actualColumnName),
+        children = None,
+        precision = precision,
+        scale = scale,
+        rawSchema = rawSchema), dataMapField)
+    }
+    else {
+      (Field(column = actualColumnName,
+        dataType = Some(dataType.typeName),
+        name = Some(actualColumnName),
+        children = None,
+        rawSchema = rawSchema), dataMapField)
+    }
+  }
+
+  /**
+   * Below method will be used to update the main table about the pre aggregate table information
+   * in case of any exption it will throw error so pre aggregate table creation will fail
+   * @param dbName
+   * @param tableName
+   * @param childSchema
+   * @param sparkSession
+   */
+  def updateMainTable(dbName: String, tableName: String,
+      childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+      LockUsage.DROP_TABLE_LOCK)
+    var locks = List.empty[ICarbonLock]
+    var carbonTable: CarbonTable = null
+    var numberOfCurrentChild: Int = 0
+    try {
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
+      // get the latest carbon table and check for column existence
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(thriftTableInfo,
+          dbName,
+          tableName,
+          carbonTable.getStorePath)
+      numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
+      wrapperTableInfo.getDataMapSchemaList.add(childSchema)
+      val thriftTable = schemaConverter
+        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      updateSchemaInfo(carbonTable,
+        thriftTable)(sparkSession,
+        sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      LOGGER.info(s"Pre Aggeragte Parent table updated is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
+        revertMainTableChanges(dbName, tableName, numberOfCurrentChild)(sparkSession)
+        throw e
+    } finally {
+      // release lock after command execution completion
+      releaseLocks(locks)
+    }
+    Seq.empty
+  }
+
+  /**
+   * Below method will be used to update the main table schema
+   * @param carbonTable
+   * @param thriftTable
+   * @param sparkSession
+   * @param sessionState
+   */
+  def updateSchemaInfo(carbonTable: CarbonTable,
+      thriftTable: TableInfo)(sparkSession: SparkSession,
+      sessionState: CarbonSessionState): Unit = {
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getFactTableName
+    CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
+        carbonTable.getCarbonTableIdentifier,
+        thriftTable,
+        carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+    val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+  }
+
+  /**
+   * This method will split schema string into multiple parts of configured size and
+   * registers the parts as keys in tableProperties which will be read by spark to prepare
+   * Carbon Table fields
+   *
+   * @param sparkConf
+   * @param schemaJsonString
+   * @return
+   */
+  private def prepareSchemaJson(sparkConf: SparkConf,
+      schemaJsonString: String): String = {
+    val threshold = sparkConf
+      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
+        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
+    // Split the JSON string.
+    val parts = schemaJsonString.grouped(threshold).toSeq
+    var schemaParts: Seq[String] = Seq.empty
+    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
+    parts.zipWithIndex.foreach { case (part, index) =>
+      schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
+    }
+    schemaParts.mkString(",")
+  }
+
+  /**
+   * Validates that the table exists and acquires meta lock on it.
+   *
+   * @param dbName
+   * @param tableName
+   * @return
+   */
+  def acquireLock(dbName: String,
+      tableName: String,
+      locksToBeAcquired: List[String],
+      table: CarbonTable): List[ICarbonLock] = {
+    // acquire the lock first
+    val acquiredLocks = ListBuffer[ICarbonLock]()
+    try {
+      locksToBeAcquired.foreach { lock =>
+        acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+      }
+      acquiredLocks.toList
+    } catch {
+      case e: Exception =>
+        releaseLocks(acquiredLocks.toList)
+        throw e
+    }
+  }
+
+  /**
+   * This method will release the locks acquired for an operation
+   *
+   * @param locks
+   */
+  def releaseLocks(locks: List[ICarbonLock]): Unit = {
+    locks.foreach { carbonLock =>
+      if (carbonLock.unlock()) {
+        LOGGER.info("Pre agg table lock released successfully")
+      } else {
+        LOGGER.error("Unable to release lock during Pre agg table cretion")
+      }
+    }
+  }
+
+  /**
+   * This method reverts the changes to the schema if add column command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param numberOfChildSchema
+   * @param sparkSession
+   */
+  def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
+    (sparkSession: SparkSession): Unit = {
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = metastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
+    carbonTable.getTableLastUpdatedTime
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      carbonTable.getCarbonTableIdentifier)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
+      metastore
+        .revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier,
+          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
index af361d5..2be3c7c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
@@ -105,7 +105,7 @@ private[sql] case class AlterTableRenameTableCommand(
       }
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+      val newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 16724fc..6c29f83 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -110,6 +111,22 @@ class CarbonFileMetastore extends CarbonMetaStore {
     }
   }
 
+  /**
+   * This method will overwrite the existing schema and update it with the given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)(sparkSession: SparkSession): String = {
+    updateTableSchemaForAlter(newTableIdentifier,
+      oldTableIdentifier, thriftTableInfo, null, carbonStorePath) (sparkSession)
+  }
+
   def lookupRelation(dbName: Option[String], tableName: String)
     (sparkSession: SparkSession): LogicalPlan = {
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
@@ -205,7 +222,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tablePath
    * @param sparkSession
    */
-  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -242,7 +259,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tablePath
    * @param sparkSession
    */
-  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+  def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       tablePath: String)(sparkSession: SparkSession): String = {
     val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
@@ -262,7 +279,27 @@ class CarbonFileMetastore extends CarbonMetaStore {
     path
   }
 
+  override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
+  CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        carbonTableIdentifier.getDatabaseName,
+        carbonTableIdentifier.getTableName,
+        tableIdentifier.getStorePath)
+    val childSchemaList = wrapperTableInfo.getDataMapSchemaList
+    childSchemaList.remove(childSchemaList.size() - 1)
+    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    val path = createSchemaThriftFile(wrapperTableInfo,
+      thriftTableInfo,
+      tableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, tableIdentifier)
+    path
 
+  }
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 76241a6..c64b7bb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -115,7 +115,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    * @param schemaEvolutionEntry
    * @param sparkSession
    */
-  override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+  override def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -126,7 +126,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    updateHiveMetaStore(newTableIdentifier,
+    updateHiveMetaStoreForAlter(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
       identifier.getStorePath,
@@ -134,7 +134,29 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       schemaConverter)
   }
 
-  private def updateHiveMetaStore(newTableIdentifier: CarbonTableIdentifier,
+  /**
+   * This method will overwrite the existing schema and update it with the given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)(sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath)
+    updateHiveMetaStoreForDataMap(newTableIdentifier,
+      oldTableIdentifier,
+      thriftTableInfo,
+      identifier.getStorePath,
+      sparkSession,
+      schemaConverter)
+  }
+
+  private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       carbonStorePath: String,
@@ -161,6 +183,30 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
   }
 
+  private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: format.TableInfo,
+      carbonStorePath: String,
+      sparkSession: SparkSession,
+      schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        carbonStorePath)
+    wrapperTableInfo.setStorePath(carbonStorePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val dbName = oldTableIdentifier.getDatabaseName
+    val tableName = oldTableIdentifier.getTableName
+    sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
+    removeTableFromMetadata(dbName, tableName)
+    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
+  }
+
   /**
    * This method will is used to remove the evolution entry in case of failure.
    *
@@ -168,7 +214,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    * @param thriftTableInfo
    * @param sparkSession
    */
-  override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+  override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       tablePath: String)
     (sparkSession: SparkSession): String = {
@@ -176,7 +222,23 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    updateHiveMetaStore(carbonTableIdentifier,
+    updateHiveMetaStoreForAlter(carbonTableIdentifier,
+      carbonTableIdentifier,
+      thriftTableInfo,
+      identifier.getStorePath,
+      sparkSession,
+      schemaConverter)
+  }
+
+  override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
+  CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val childSchemas = thriftTableInfo.dataMapSchemas
+    childSchemas.remove(childSchemas.size())
+    updateHiveMetaStoreForAlter(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
       identifier.getStorePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index dcb43d1..24996ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -66,13 +66,26 @@ trait CarbonMetaStore {
    * @param carbonStorePath
    * @param sparkSession
    */
-  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       carbonStorePath: String)(sparkSession: SparkSession): String
 
   /**
+   * This method will overwrite the existing schema and update it with the given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)(sparkSession: SparkSession): String
+
+  /**
    * This method will is used to remove the evolution entry in case of failure.
    *
    * @param carbonTableIdentifier
@@ -80,11 +93,15 @@ trait CarbonMetaStore {
    * @param tablePath
    * @param sparkSession
    */
-  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+  def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       tablePath: String)
     (sparkSession: SparkSession): String
 
+
+  def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      tablePath: String)(sparkSession: SparkSession): String
   /**
    * Prepare Thrift Schema from wrapper TableInfo and write to disk
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9f4a8ce..e015c4f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -440,7 +440,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       // so checking the start of the string and taking the precision and scale.
       // resetting the data type with decimal
       if (f.dataType.getOrElse("").startsWith("decimal")) {
-        val (precision, scale) = getScaleAndPrecision(col.dataType.catalogString)
+        val (precision, scale) = CommonUtil.getScaleAndPrecision(col.dataType.catalogString)
         f.precision = precision
         f.scale = scale
         f.dataType = Some("decimal")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 52008f2..0ce2155 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,16 +18,17 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -38,7 +39,7 @@ import org.apache.carbondata.spark.util.CommonUtil
  */
 class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
 
-  val astBuilder = new CarbonSqlAstBuilder(conf)
+  val astBuilder = new CarbonSqlAstBuilder(conf, sparkSession: SparkSession)
 
   private val substitutor = new VariableSubstitution(conf)
 
@@ -69,7 +70,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
 
   val parser = new CarbonSpark2SqlParser
 
@@ -119,7 +121,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
 
       val tableProperties = mutable.Map[String, String]()
       properties.foreach{property => tableProperties.put(property._1, property._2)}
-
+      val isAggTable = tableProperties.get("parent").isDefined
       // validate partition clause
       if (partitionerFields.nonEmpty) {
         if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
@@ -129,10 +131,20 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
         if (badPartCols.nonEmpty) {
           operationNotAllowed(s"Partition columns should not be specified in the schema: " +
-                              badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+          badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
         }
       }
-      val fields = parser.getFields(cols ++ partitionByStructFields)
+      var fields = parser.getFields(cols ++ partitionByStructFields)
+      val dfAndFieldRelationTuple = if (isAggTable) {
+        val selectQuery = Option(ctx.query).map(plan).get
+        val df = Dataset.ofRows(sparkSession, selectQuery)
+        val fieldRelationMap = PreAggregateUtil
+          .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, source(ctx.query()))
+        fields = fieldRelationMap.keySet.toSeq
+        Some(df, fieldRelationMap)
+      } else {
+        None
+      }
       val options = new CarbonOption(properties)
       // validate tblProperties
       val bucketFields = parser.getBucketFields(tableProperties, fields, options)
@@ -146,7 +158,14 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         tableProperties,
         bucketFields)
 
-      CarbonCreateTableCommand(tableModel)
+      if(!isAggTable) {
+        CarbonCreateTableCommand(tableModel)
+      } else {
+        CreatePreAggregateTableCommand(tableModel,
+          dfAndFieldRelationTuple.get._1,
+          queryString = source(ctx.query).toString,
+          fieldRelationMap = dfAndFieldRelationTuple.get._2)
+      }
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 74f4dd0..a82823d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -132,7 +132,7 @@ object AlterTableUtil {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getFactTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+      .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         thriftTable,
         schemaEvolutionEntry,
@@ -207,7 +207,7 @@ object AlterTableUtil {
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
         val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
-        metastore.revertTableSchema(tableIdentifier,
+        metastore.revertTableSchemaInAlterFailure(tableIdentifier,
           tableInfo, carbonTablePath.getPath)(sparkSession)
         metastore.removeTableFromMetadata(database, newTableName)
       }
@@ -239,7 +239,7 @@ object AlterTableUtil {
       val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
       metastore
-        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
@@ -274,7 +274,7 @@ object AlterTableUtil {
         }
       }
       metastore
-        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
@@ -312,7 +312,7 @@ object AlterTableUtil {
         }
       }
       metastore
-        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }