You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/25 12:54:40 UTC

[06/10] carbondata git commit: [CARBONDATA-2189] Add DataMapProvider developer interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
deleted file mode 100644
index b2ab977..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{File, FilenameFilter}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.MetadataProcessException
-import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
-
-  val testData = s"$resourcesPath/sample.csv"
-
-  override def beforeAll {
-    sql("drop table if exists datamaptest")
-    sql("drop table if exists datamapshowtest")
-    sql("drop table if exists uniqdata")
-    sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
-  }
-
-  val newClass = "org.apache.spark.sql.CarbonSource"
-
-  test("test datamap create: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
-    }
-  }
-
-  test("test datamap create with dmproperties: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-    }
-  }
-
-  test("test datamap create with existing name: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql(
-        s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-    }
-  }
-
-  test("test datamap create with preagg") {
-    sql("drop datamap if exists datamap3 on table datamaptest")
-    sql(
-      "create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 1)
-    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
-    assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value"))
-    assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
-  }
-
-  test("check hivemetastore after drop datamap") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          "true")
-      sql("drop table if exists hiveMetaStoreTable")
-      sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
-
-      sql(
-        "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable")
-      checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
-
-      sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
-      checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
-
-    } finally {
-      sql("drop table hiveMetaStoreTable")
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    }
-  }
-
-  test("drop the table having pre-aggregate") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          "true")
-      sql("drop table if exists hiveMetaStoreTable_1")
-      sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
-
-      sql(
-        "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1")
-
-      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
-        true,
-        "datamap_hiveMetaStoreTable_1")
-
-      sql("drop table hiveMetaStoreTable_1")
-
-      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
-    } finally {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    }
-  }
-
-  test("test datamap create with preagg with duplicate name") {
-    sql(
-      s"""
-         | CREATE DATAMAP datamap10 ON TABLE datamaptest
-         | USING 'preaggregate'
-         | DMPROPERTIES('key'='value')
-         | AS SELECT COUNT(a) FROM datamaptest
-         """.stripMargin)
-    intercept[MalformedDataMapCommandException] {
-      sql(
-        s"""
-           | CREATE DATAMAP datamap10 ON TABLE datamaptest
-           | USING 'preaggregate'
-           | DMPROPERTIES('key'='value')
-           | AS SELECT COUNT(a) FROM datamaptest
-         """.stripMargin)
-    }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 2)
-  }
-
-  test("test drop non-exist datamap") {
-    intercept[NoSuchDataMapException] {
-      sql("drop datamap nonexist on table datamaptest")
-    }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 2)
-  }
-
-  test("test show datamap without preaggregate: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql("drop table if exists datamapshowtest")
-      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
-      sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
-      checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass)
-    }
-  }
-
-  test("test show datamap with preaggregate: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql("drop table if exists datamapshowtest")
-      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
-      sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
-      val frame = sql("show datamap on table datamapshowtest")
-      assert(frame.collect().length == 2)
-      checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1")
-    }
-  }
-
-  test("test show datamap with no datamap") {
-    sql("drop table if exists datamapshowtest")
-    sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
-    assert(sql("show datamap on table datamapshowtest").collect().length == 0)
-  }
-
-  test("test show datamap after dropping datamap: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql("drop table if exists datamapshowtest")
-      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
-      sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
-      sql("drop datamap datamap1 on table datamapshowtest")
-      val frame = sql("show datamap on table datamapshowtest")
-      assert(frame.collect().length == 1)
-      checkExistence(frame, true, "datamap2", "(NA)", newClass)
-    }
-  }
-
-  test("test if preaggregate load is successfull for hivemetastore") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
-      sql("DROP TABLE IF EXISTS maintable")
-      sql(
-        """
-          | CREATE TABLE maintable(id int, name string, city string, age int)
-          | STORED BY 'org.apache.carbondata.format'
-        """.stripMargin)
-      sql(
-        s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
-
-          .stripMargin)
-      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
-      checkAnswer(sql(s"select * from maintable_preagg_sum"),
-        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
-    } finally {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    }
-  }
-
-  test("test preaggregate load for decimal column for hivemetastore") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
-    sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'")
-    sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
-    sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
-    checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
-    sql("drop datamap if exists uniqdata_agg on table uniqdata")
-  }
-
-  test("create pre-agg table with path") {
-    sql("drop table if exists main_preagg")
-    sql("drop table if exists main ")
-    val warehouse = s"$metastoredb/warehouse"
-    val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
-    sql(
-      s"""
-         | create table main(
-         |     year int,
-         |     month int,
-         |     name string,
-         |     salary int)
-         | stored by 'carbondata'
-         | tblproperties('sort_columns'='month,year,name')
-      """.stripMargin)
-    sql("insert into main select 10,11,'amy',12")
-    sql("insert into main select 10,11,'amy',14")
-    sql(
-      s"""
-         | create datamap preagg
-         | on table main
-         | using 'preaggregate'
-         | dmproperties ('path'='$path')
-         | as select name,avg(salary)
-         |    from main
-         |    group by name
-       """.stripMargin)
-    assertResult(true)(new File(path).exists())
-    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
-      .list(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
-        }
-      }).length > 0)
-    checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
-    checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
-    sql("drop datamap preagg on table main")
-    assertResult(false)(new File(path).exists())
-    sql("drop table main")
-  }
-
-  override def afterAll {
-    sql("DROP TABLE IF EXISTS maintable")
-    sql("drop table if exists uniqdata")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-      CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    sql("drop table if exists datamaptest")
-    sql("drop table if exists datamapshowtest")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
new file mode 100644
index 0000000..a6ffa9a
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestIndexDataMapCommand extends QueryTest with BeforeAndAfterAll {
+
+  val testData = s"$resourcesPath/sample.csv"
+
+  override def beforeAll {
+    sql("drop table if exists datamaptest")
+    sql("drop table if exists datamapshowtest")
+    sql("drop table if exists uniqdata")
+    sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
+  }
+
+  val newClass = "org.apache.spark.sql.CarbonSource"
+
+  test("test datamap create: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
+    }
+  }
+
+  test("test datamap create with dmproperties: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+    }
+  }
+
+  test("test datamap create with existing name: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(
+        s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+    }
+  }
+
+  test("test datamap create with preagg") {
+    sql("drop datamap if exists datamap3 on table datamaptest")
+    sql(
+      "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 1)
+    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
+    assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
+  }
+
+  test("check hivemetastore after drop datamap") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop table if exists hiveMetaStoreTable")
+      sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
+
+      sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
+
+    } finally {
+      sql("drop table hiveMetaStoreTable")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("drop the table having pre-aggregate") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop table if exists hiveMetaStoreTable_1")
+      sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1")
+
+      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+        true,
+        "datamap_hiveMetaStoreTable_1")
+
+      sql("drop table hiveMetaStoreTable_1")
+
+      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("test datamap create with preagg with duplicate name") {
+    sql(
+      s"""
+         | CREATE DATAMAP datamap10 ON TABLE datamaptest
+         | USING 'preaggregate'
+         | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+    intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP datamap10 ON TABLE datamaptest
+           | USING 'preaggregate'
+           | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 2)
+  }
+
+  test("test drop non-exist datamap") {
+    intercept[NoSuchDataMapException] {
+      sql("drop datamap nonexist on table datamaptest")
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 2)
+  }
+
+  test("test show datamap without preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+      checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass)
+    }
+  }
+
+  test("test show datamap with preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 2)
+      checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1")
+    }
+  }
+
+  test("test show datamap with no datamap") {
+    sql("drop table if exists datamapshowtest")
+    sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+    assert(sql("show datamap on table datamapshowtest").collect().length == 0)
+  }
+
+  test("test show datamap after dropping datamap: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+      sql("drop datamap datamap1 on table datamapshowtest")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 1)
+      checkExistence(frame, true, "datamap2", "(NA)", newClass)
+    }
+  }
+
+  test("test if preaggregate load is successfull for hivemetastore") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+      sql("DROP TABLE IF EXISTS maintable")
+      sql(
+        """
+          | CREATE TABLE maintable(id int, name string, city string, age int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(
+        s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+          .stripMargin)
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+      checkAnswer(sql(s"select * from maintable_preagg_sum"),
+        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("test preaggregate load for decimal column for hivemetastore") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+    sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'")
+    sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
+    sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
+    checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
+    sql("drop datamap if exists uniqdata_agg on table uniqdata")
+  }
+
+  test("create pre-agg table with path") {
+    sql("drop table if exists main_preagg")
+    sql("drop table if exists main ")
+    val warehouse = s"$metastoredb/warehouse"
+    val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
+    sql(
+      s"""
+         | create table main(
+         |     year int,
+         |     month int,
+         |     name string,
+         |     salary int)
+         | stored by 'carbondata'
+         | tblproperties('sort_columns'='month,year,name')
+      """.stripMargin)
+    sql("insert into main select 10,11,'amy',12")
+    sql("insert into main select 10,11,'amy',14")
+    sql(
+      s"""
+         | create datamap preagg
+         | on table main
+         | using 'preaggregate'
+         | dmproperties ('path'='$path')
+         | as select name,avg(salary)
+         |    from main
+         |    group by name
+       """.stripMargin)
+    assertResult(true)(new File(path).exists())
+    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+      .list(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+        }
+      }).length > 0)
+    checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
+    checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
+    sql("drop datamap preagg on table main")
+    assertResult(false)(new File(path).exists())
+    sql("drop table main")
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql("drop table if exists uniqdata")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+      CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    sql("drop table if exists datamaptest")
+    sql("drop table if exists datamapshowtest")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
index 84b59e6..248441f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMapFactory, AbstractCoarseGrainIndexDataMap}
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -47,7 +47,12 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
     buildTestData()
 
     // register hook to the table to sleep, thus the other command will be executed
-    sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1")
+    sql(
+      s"""
+         | create datamap test on table orders
+         | using '${classOf[WaitingIndexDataMap].getName}'
+         | as select count(a) from hiveMetaStoreTable_1")
+       """.stripMargin)
   }
 
   private def buildTestData(): Unit = {
@@ -101,7 +106,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
     )
     while (!Global.overwriteRunning && count < 1000) {
       Thread.sleep(10)
-      // to avoid dead loop in case WaitingDataMap is not invoked
+      // to avoid dead loop in case WaitingIndexDataMap is not invoked
       count += 1
     }
     future
@@ -162,7 +167,7 @@ object Global {
   var overwriteRunning = false
 }
 
-class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
+class WaitingIndexDataMap() extends AbstractCoarseGrainIndexDataMapFactory {
 
   override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { }
 
@@ -172,9 +177,9 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ???
 
   override def createWriter(segmentId: String, writerPath: String): AbstractDataMapWriter = {
     new AbstractDataMapWriter(null, segmentId, writerPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
new file mode 100644
index 0000000..2b3a306
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.PREAGGREGATE;
+import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES;
+
+public class DataMapManager {
+
+  private static DataMapManager INSTANCE;
+
+  private DataMapManager() { }
+
+  public static synchronized DataMapManager get() {
+    if (INSTANCE == null) {
+      INSTANCE = new DataMapManager();
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Return a DataMapProvider instance for specified dataMapSchema.
+   */
+  public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) {
+    DataMapProvider provider;
+    if (dataMapSchema.getClassName().equalsIgnoreCase(PREAGGREGATE.toString())) {
+      provider = new PreAggregateDataMapProvider();
+    } else if (dataMapSchema.getClassName().equalsIgnoreCase(TIMESERIES.toString())) {
+      provider = new TimeseriesDataMapProvider();
+    } else {
+      provider = new IndexDataMapProvider();
+    }
+    return provider;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
new file mode 100644
index 0000000..0cf0d04
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Property that can be specified when creating DataMap
+ */
+@InterfaceAudience.Internal
+public class DataMapProperty {
+
+  /**
+   * Used to specify the store location of the datamap
+   */
+  public static final String PATH = "path";
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
new file mode 100644
index 0000000..a71e0d8
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.processing.exception.DataLoadingException;
+
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DataMap is a accelerator for certain type of query. Developer can add new DataMap
+ * implementation to improve query performance.
+ *
+ * Currently two types of DataMap are supported
+ * <ol>
+ *   <li> MVDataMap: materialized view type of DataMap to accelerate olap style query,
+ * like SPJG query (select, predicate, join, groupby) </li>
+ *   <li> IndexDataMap: index type of DataMap to accelerate filter query </li>
+ * </ol>
+ *
+ * <p>
+ * In following command <br>
+ * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br>
+ * the <b>provider</b> string can be a short name or class name of the DataMap implementation.
+ *
+ * <br>Currently CarbonData supports following provider:
+ * <ol>
+ *   <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
+ *   <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
+ *     of the table </li>
+ *   <li> class name of {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory}
+ * implementation: Developer can implement new type of IndexDataMap by extending
+ * {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} </li>
+ * </ol>
+ *
+ * @since 1.4.0
+ */
+@InterfaceAudience.Developer("DataMap")
+@InterfaceStability.Unstable
+public interface DataMapProvider {
+
+  /**
+   * Initialize a datamap's metadata.
+   * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
+   * Implementation should initialize metadata for datamap, like creating table
+   */
+  void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+      SparkSession sparkSession) throws MalformedDataMapCommandException;
+
+  /**
+   * Initialize a datamap's data.
+   * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
+   * Implementation should initialize data for datamap, like creating data folders
+   */
+  void initData(CarbonTable mainTable, SparkSession sparkSession);
+
+  /**
+   * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String, SparkSession)}.
+   * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
+   * Implementation should clean all meta for the datamap
+   */
+  void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession);
+
+  /**
+   * Opposite operation of {@link #initData(CarbonTable, SparkSession)}.
+   * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
+   * Implementation should clean all data for the datamap
+   */
+  void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession);
+
+  /**
+   * Rebuild the datamap by loading all existing data from mainTable
+   * This is called when refreshing the datamap when
+   * 1. after datamap creation and if `autoRefreshDataMap` is set to true
+   * 2. user manually trigger refresh datamap command
+   */
+  void rebuild(CarbonTable mainTable, SparkSession sparkSession) throws DataLoadingException;
+
+  /**
+   * Build the datamap incrementally by loading specified segment data
+   * This is called when user manually trigger refresh datamap
+   */
+  void incrementalBuild(CarbonTable mainTable, String[] segmentIds, SparkSession sparkSession)
+    throws DataLoadingException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
new file mode 100644
index 0000000..e11e522
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.DataMapRegistry;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.format.TableInfo;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil;
+
+@InterfaceAudience.Internal
+public class IndexDataMapProvider implements DataMapProvider {
+
+  private TableInfo originalTableInfo;
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+      SparkSession sparkSession) throws MalformedDataMapCommandException {
+    IndexDataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
+    DataMapStoreManager.getInstance().registerDataMap(
+        mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
+    originalTableInfo = PreAggregateUtil.updateMainTable(mainTable, dataMapSchema, sparkSession);
+  }
+
+  @Override
+  public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    PreAggregateUtil.updateSchemaInfo(mainTable, originalTableInfo, sparkSession);
+  }
+
+  @Override
+  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    DataMapStoreManager.getInstance().clearDataMap(
+        mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
+  }
+
+  @Override
+  public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
+      SparkSession sparkSession) {
+    throw new UnsupportedOperationException();
+  }
+
+  private IndexDataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    IndexDataMapFactory dataMapFactory;
+    try {
+      // try to create DataMapProvider instance by taking providerName as class name
+      Class<? extends IndexDataMapFactory> providerClass =
+          (Class<? extends IndexDataMapFactory>) Class.forName(dataMapSchema.getClassName());
+      dataMapFactory = providerClass.newInstance();
+    } catch (ClassNotFoundException e) {
+      // try to create DataMapProvider instance by taking providerName as short name
+      dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getClassName());
+    } catch (Throwable e) {
+      throw new MetadataProcessException(
+          "failed to create DataMapProvider '" + dataMapSchema.getClassName() + "'", e);
+    }
+    return dataMapFactory;
+  }
+
+  private IndexDataMapFactory getDataMapFactoryByShortName(String providerName)
+      throws MalformedDataMapCommandException {
+    IndexDataMapFactory dataMapFactory;
+    String className = DataMapRegistry.getDataMapClassName(providerName);
+    if (className != null) {
+      try {
+        Class<? extends IndexDataMapFactory> datamapClass =
+            (Class<? extends IndexDataMapFactory>) Class.forName(providerName);
+        dataMapFactory = datamapClass.newInstance();
+      } catch (ClassNotFoundException ex) {
+        throw new MalformedDataMapCommandException(
+            "DataMap '" + providerName + "' not found", ex);
+      } catch (Throwable ex) {
+        throw new MetadataProcessException(
+            "failed to create DataMap '" + providerName + "'", ex);
+      }
+    } else {
+      throw new MalformedDataMapCommandException(
+          "DataMap '" + providerName + "' not found");
+    }
+    return dataMapFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
new file mode 100644
index 0000000..ac38347
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper;
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand;
+import scala.Some;
+
+@InterfaceAudience.Internal
+public class PreAggregateDataMapProvider implements DataMapProvider {
+  protected PreAggregateTableHelper helper;
+  protected CarbonDropTableCommand dropTableCommand;
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+      SparkSession sparkSession) throws MalformedDataMapCommandException {
+    validateDmProperty(dataMapSchema);
+    helper = new PreAggregateTableHelper(
+        mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(),
+        dataMapSchema.getProperties(), ctasSqlStatement, null);
+    helper.initMeta(sparkSession);
+  }
+
+  private void validateDmProperty(DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    if (!dataMapSchema.getProperties().isEmpty()) {
+      if (dataMapSchema.getProperties().size() > 1 ||
+          !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH)) {
+        throw new MalformedDataMapCommandException(
+            "Only 'path' dmproperty is allowed for this datamap");
+      }
+    }
+  }
+
+  @Override
+  public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    dropTableCommand = new CarbonDropTableCommand(
+        true,
+        new Some<>(dataMapSchema.getRelationIdentifier().getDatabaseName()),
+        dataMapSchema.getRelationIdentifier().getTableName(),
+        true);
+    dropTableCommand.processMetadata(sparkSession);
+  }
+
+  @Override
+  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    if (dropTableCommand != null) {
+      dropTableCommand.processData(sparkSession);
+    }
+  }
+
+  @Override
+  public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+    if (helper != null) {
+      helper.initData(sparkSession);
+    }
+  }
+
+  @Override
+  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
+      SparkSession sparkSession) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
new file mode 100644
index 0000000..510839d
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper;
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil;
+import scala.Tuple2;
+
+@InterfaceAudience.Internal
+public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider {
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+      SparkSession sparkSession) {
+    Map<String, String> dmProperties = dataMapSchema.getProperties();
+    String dmProviderName = dataMapSchema.getClassName();
+    TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName);
+    Tuple2<String, String> details =
+        TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmProviderName);
+    dmProperties.remove(details._1());
+    helper = new PreAggregateTableHelper(
+        mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(),
+        dmProperties, ctasSqlStatement, details._1());
+    helper.initMeta(sparkSession);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 870b1f3..5fb3d56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -93,7 +93,7 @@ class CarbonEnv {
             properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
           }
           LOGGER.info(s"carbon env initial: $storePath")
-          // trigger event for CarbonEnv init
+          // trigger event for CarbonEnv create
           val operationContext = new OperationContext
           val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
             CarbonEnvInitPreEvent(sparkSession, storePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 3d3f83b..3f22955 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -21,16 +21,11 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
-import org.apache.spark.sql.hive.CarbonRelation
 
-import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
 
 /**
  * Below command class will be used to create datamap on table
@@ -44,64 +39,30 @@ case class CarbonCreateDataMapCommand(
     queryString: Option[String])
   extends AtomicRunnableCommand {
 
-  var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _
+  private var dataMapProvider: DataMapProvider = _
+  private var mainTable: CarbonTable = _
+  private var dataMapSchema: DataMapSchema = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     // since streaming segment does not support building index and pre-aggregate yet,
     // so streaming table does not support create datamap
-    val carbonTable =
-    CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
-    if (carbonTable.isStreamingTable) {
+    mainTable =
+      CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+    if (mainTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
-    validateDataMapName(carbonTable)
+    validateDataMapName(mainTable)
+    dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+    dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
+    dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema)
+    dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, sparkSession)
 
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
-      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName)
-      createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-        val details = TimeSeriesUtil
-          .getTimeSeriesGranularityDetails(dmproperties, dmClassName)
-        val updatedDmProperties = dmproperties - details._1
-        CreatePreAggregateTableCommand(dataMapName,
-          tableIdentifier,
-          dmClassName,
-          updatedDmProperties,
-          queryString.get,
-          Some(details._1))
-      } else {
-        CreatePreAggregateTableCommand(
-          dataMapName,
-          tableIdentifier,
-          dmClassName,
-          dmproperties,
-          queryString.get
-        )
-      }
-      try {
-        createPreAggregateTableCommands.processMetadata(sparkSession)
-      } catch {
-        case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
-                                                                s"'$dataMapName'", e)
-      }
-    } else {
-      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
-      dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
-      val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
-      val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
-        Some(dbName),
-        tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-      DataMapStoreManager.getInstance().createAndRegisterDataMap(
-        carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
-      // Save DataMapSchema in the  schema file of main table
-      PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
-    }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
     Seq.empty
   }
 
-  private def validateDataMapName(carbonTable: CarbonTable) = {
+  private def validateDataMapName(carbonTable: CarbonTable): Unit = {
     val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
     existingDataMaps.asScala.foreach { dataMapSchema =>
       if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
@@ -111,18 +72,19 @@ case class CarbonCreateDataMapCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
-      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      createPreAggregateTableCommands.processData(sparkSession)
-    } else {
-      Seq.empty
+    if (dataMapProvider != null) {
+      dataMapProvider.initData(mainTable, sparkSession)
+      if (mainTable.isAutoRefreshDataMap) {
+        dataMapProvider.rebuild(mainTable, sparkSession)
+      }
     }
+    Seq.empty
   }
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
-      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
+    if (dataMapProvider != null) {
+      dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
+      Seq.empty
     } else {
       throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index f410f52..4cfc6b4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -25,16 +25,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
-import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
 import org.apache.carbondata.events._
 
 /**
@@ -51,7 +50,9 @@ case class CarbonDropDataMapCommand(
     tableName: String)
   extends AtomicRunnableCommand {
 
-  var commandToRun: CarbonDropTableCommand = _
+  private var dataMapProvider: DataMapProvider = _
+  private var mainTable: CarbonTable = _
+  private var dataMapSchema: DataMapSchema = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -76,44 +77,42 @@ case class CarbonDropDataMapCommand(
           throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
       }
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
-        val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
+        mainTable = carbonTable.get
+        val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
-        if (dataMapSchema.isDefined) {
+        if (dataMapSchemaOp.isDefined) {
+          dataMapSchema = dataMapSchemaOp.get._1
           val operationContext = new OperationContext
           val dropDataMapPreEvent =
             DropDataMapPreEvent(
-              Some(dataMapSchema.get._1),
+              Some(dataMapSchema),
               ifExistsSet,
               sparkSession)
           OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext)
 
-          carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2)
+          mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2)
           val schemaConverter = new ThriftWrapperSchemaConverterImpl
           PreAggregateUtil.updateSchemaInfo(
-            carbonTable.get,
+            mainTable,
             schemaConverter.fromWrapperToExternalTableInfo(
-              carbonTable.get.getTableInfo,
+              mainTable.getTableInfo,
               dbName,
               tableName))(sparkSession)
-          commandToRun = CarbonDropTableCommand(
-            ifExistsSet = true,
-            Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
-            dataMapSchema.get._1.getRelationIdentifier.getTableName,
-            dropChildTable = true
-          )
-          commandToRun.processMetadata(sparkSession)
+          dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema)
+          dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
+
           // fires the event after dropping datamap from main table schema
           val dropDataMapPostEvent =
             DropDataMapPostEvent(
-              Some(dataMapSchema.get._1),
+              Some(dataMapSchema),
               ifExistsSet,
               sparkSession)
           OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext)
         } else if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
-      } else if ((carbonTable.isDefined &&
-        carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0)) {
+      } else if (carbonTable.isDefined &&
+                 carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
         if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
@@ -140,10 +139,8 @@ case class CarbonDropDataMapCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
-    if (commandToRun != null) {
-      DataMapStoreManager.getInstance().clearDataMap(
-        commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName)
-      commandToRun.processData(sparkSession)
+    if (dataMapProvider != null) {
+      dataMapProvider.freeData(mainTable, dataMapSchema, sparkSession)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index b53c609..ecf6f99 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -97,7 +97,7 @@ case class CarbonAlterTableDropPartitionCommand(
     partitionInfo.dropPartition(partitionIndex)
 
     // read TableInfo
-    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 84779cc..732178c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -89,7 +89,7 @@ case class CarbonAlterTableSplitPartitionCommand(
     updatePartitionInfo(partitionInfo, partitionIds)
 
     // read TableInfo
-    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
deleted file mode 100644
index 6d4822b..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.preaaggregate
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-
-/**
- * Below command class will be used to create pre-aggregate table
- * and updating the parent table about the child table information
- * It will be either success or nothing happen in case of failure:
- * 1. failed to create pre aggregate table.
- * 2. failed to update main table
- *
- */
-case class CreatePreAggregateTableCommand(
-    dataMapName: String,
-    parentTableIdentifier: TableIdentifier,
-    dmClassName: String,
-    dmProperties: Map[String, String],
-    queryString: String,
-    timeSeriesFunction: Option[String] = None)
-  extends AtomicRunnableCommand {
-
-  var parentTable: CarbonTable = _
-  var loadCommand: CarbonLoadDataCommand = _
-
-  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
-    val df = sparkSession.sql(updatedQuery)
-    val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
-      df.logicalPlan, queryString)
-    val fields = fieldRelationMap.keySet.toSeq
-    val tableProperties = mutable.Map[String, String]()
-    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-
-    parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
-      throw new MalformedDataMapCommandException(
-        "Parent table name is different in select and create")
-    }
-    var neworder = Seq[String]()
-    val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
-    parentOrder.foreach(parentcol =>
-      fields.filter(col => (fieldRelationMap.get(col).get.aggregateFunction.isEmpty) &&
-                           (parentcol.equals(fieldRelationMap.get(col).get.
-                             columnTableRelationList.get(0).parentColumnName)))
-        .map(cols => neworder :+= cols.column)
-    )
-    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
-    tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
-      getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
-      .LOAD_SORT_SCOPE_DEFAULT))
-    tableProperties
-      .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
-    val tableIdentifier =
-      TableIdentifier(parentTableIdentifier.table + "_" + dataMapName,
-        parentTableIdentifier.database)
-    // prepare table model of the collected tokens
-    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
-      ifNotExistPresent = false,
-      new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
-      tableIdentifier.table.toLowerCase,
-      fields,
-      Seq(),
-      tableProperties,
-      None,
-      isAlterFlow = false,
-      None)
-
-
-    // updating the relation identifier, this will be stored in child table
-    // which can be used during dropping of pre-aggreate table as parent table will
-    // also get updated
-    if(timeSeriesFunction.isDefined) {
-      TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable)
-      TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
-        fieldRelationMap,
-        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get)
-      TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
-        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get,
-      timeSeriesFunction.get)
-    }
-    tableModel.parentTable = Some(parentTable)
-    tableModel.dataMapRelation = Some(fieldRelationMap)
-    val tablePath = if (dmProperties.contains("path")) {
-      dmProperties("path")
-    } else {
-      CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
-    }
-    CarbonCreateTableCommand(TableNewProcessor(tableModel),
-      tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession)
-
-    val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
-    val tableInfo = table.getTableInfo
-    // child schema object which will be updated on parent table about the
-    val childSchema = tableInfo.getFactTable.buildChildSchema(
-      dataMapName,
-      CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
-      tableInfo.getDatabaseName,
-      queryString,
-      "AGGREGATION")
-    dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
-
-    // updating the parent table about child table
-    PreAggregateUtil.updateMainTable(
-      parentTable,
-      childSchema,
-      sparkSession)
-    // After updating the parent carbon table with data map entry extract the latest table object
-    // to be used in further create process.
-    parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
-      parentTableIdentifier.table)(sparkSession)
-    val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
-      val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
-        .filter(p => p.getDataMapName
-          .equalsIgnoreCase(dataMapName)).head
-        .asInstanceOf[AggregationDataMapSchema]
-      PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
-        parentTable.getTableName,
-        parentTable.getDatabaseName)
-    } else {
-      queryString
-    }
-    val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
-      updatedLoadQuery)).drop("preAggLoad")
-    val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
-      .filter(dataMap => dataMap.getDataMapName.equalsIgnoreCase(dataMapName)).head
-      .asInstanceOf[AggregationDataMapSchema]
-    loadCommand = PreAggregateUtil.createLoadCommandForChild(
-      dataMap.getChildSchema.getListOfColumns,
-      tableIdentifier,
-      dataFrame,
-      false,
-      sparkSession = sparkSession)
-    loadCommand.processMetadata(sparkSession)
-    Seq.empty
-  }
-
-  override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
-    // drop child table and undo the change in table info of main table
-    CarbonDropDataMapCommand(
-      dataMapName,
-      ifExistsSet = true,
-      parentTableIdentifier.database,
-      parentTableIdentifier.table).run(sparkSession)
-    Seq.empty
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // load child table if parent table has existing segments
-    // This will be used to check if the parent table has any segments or not. If not then no
-    // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
-    // table.
-    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
-    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
-    if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
-      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
-      throw new UnsupportedOperationException(
-        "Cannot create pre-aggregate table when insert is in progress on main table")
-    } else if (loadAvailable.nonEmpty) {
-      val updatedQuery = if (timeSeriesFunction.isDefined) {
-        val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
-          .filter(p => p.getDataMapName
-            .equalsIgnoreCase(dataMapName)).head
-          .asInstanceOf[AggregationDataMapSchema]
-        PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
-          parentTable.getTableName,
-          parentTable.getDatabaseName)
-      } else {
-        queryString
-      }
-      // Passing segmentToLoad as * because we want to load all the segments into the
-      // pre-aggregate table even if the user has set some segments on the parent table.
-      loadCommand.dataFrame = Some(PreAggregateUtil
-        .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
-      PreAggregateUtil.startDataLoadForDataMap(
-        parentTable,
-        segmentToLoad = "*",
-        validateSegments = true,
-        sparkSession,
-        loadCommand)
-    }
-    Seq.empty
-  }
-}
-
-