You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:47 UTC

[46/54] [abbrv] carbondata git commit: [CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index ccb9b68..b1962c1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -19,15 +19,15 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -293,7 +293,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   test("test pre agg create table 22: using invalid datamap provider") {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
 
-    val e: Exception = intercept[MalformedDataMapCommandException] {
+    val e = intercept[MalformedDataMapCommandException] {
       sql(
         """
           | CREATE DATAMAP agg0 ON TABLE mainTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 43316b3..ec76b37 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -201,7 +201,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("Unknown datamap provider/class abc"))
+    assert(e.getMessage.equals("DataMap class 'abc' not found"))
   }
 
   test("test timeseries create table 12: USING and catch MalformedCarbonCommandException") {
@@ -216,7 +216,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("Unknown datamap provider/class abc"))
+    assert(e.getMessage.equals("DataMap class 'abc' not found"))
   }
 
   test("test timeseries create table 13: Only one granularity level can be defined 1") {
@@ -237,6 +237,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
            | GROUP BY dataTime
        """.stripMargin)
     }
+    e.printStackTrace()
     assert(e.getMessage.equals("Only one granularity level can be defined"))
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
index c522c1e..e31896f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -113,7 +113,7 @@ object CompactionSupportGlobalSortBigFileTest {
     try {
       val write = new PrintWriter(fileName);
       for (i <- start until (start + line)) {
-        write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+        write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80))
       }
       write.close()
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 693c145..a214444 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -92,18 +92,18 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     buildTestData
   }
 
-test("test the boolean data type"){
-  booldf.write
-    .format("carbondata")
-    .option("tableName", "carbon10")
-    .option("tempCSV", "true")
-    .option("compress", "true")
-    .mode(SaveMode.Overwrite)
-    .save()
-  checkAnswer(
-    sql("SELECT * FROM CARBON10"),
-    Seq(Row("anubhav", true), Row("prince", false)))
-}
+  test("test the boolean data type"){
+    booldf.write
+      .format("carbondata")
+      .option("tableName", "carbon0")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("SELECT * FROM CARBON0"),
+      Seq(Row("anubhav", true), Row("prince", false)))
+  }
 
   test("test load dataframe with saving compressed csv files") {
     // save dataframe to carbon file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 1cbbcb4..041a63a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
 
 class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
-  var dataMapName: String = _
+  var dataMapSchema: DataMapSchema = _
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
-    this.dataMapName = dataMapName
+    this.dataMapSchema = dataMapSchema
   }
 
   /**
    * Return a new write for this datamap
    */
   override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
+    new CGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
   }
 
   /**
@@ -140,7 +140,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    * Return metadata of this datamap
    */
   override def getMeta: DataMapMeta = {
-    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
 }
 
@@ -198,12 +199,16 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
   }
 
   private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.getChildren != null) {
-      expression.getChildren.asScala.map { f =>
-        if (f.isInstanceOf[EqualToExpression]) {
-          buffer += f
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
         }
-        getEqualToExpression(f, buffer)
       }
     }
   }
@@ -221,12 +226,12 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
 class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
     segment: Segment,
     dataWritePath: String,
-    dataMapName: String)
+    dataMapSchema: DataMapSchema)
   extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
 
   var currentBlockId: String = null
   val cgwritepath = dataWritePath + "/" +
-                    dataMapName + System.nanoTime() + ".datamap"
+                    dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
   lazy val stream: DataOutputStream = FileFactory
     .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
   val blockletList = new ArrayBuffer[Array[Byte]]()
@@ -345,14 +350,29 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      table.getAbsoluteTableIdentifier,
-      classOf[CGDataMapFactory].getName, "cgdatamap")
+    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
+  test("test cg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+    sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 7e93959..00d13a9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -41,7 +42,7 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
 
   override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+      dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
   }
 
@@ -89,12 +90,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test write datamap 2 pages") {
+    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/carbon1", "default", "carbon1"),
-      classOf[C2DataMapFactory].getName,
-      "test")
-
+    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
     val df = buildTestData(33000)
 
     // save dataframe to carbon file
@@ -119,11 +117,8 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test write datamap 2 blocklet") {
-    // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/carbon2", "default", "carbon2"),
-      classOf[C2DataMapFactory].getName,
-      "test")
+    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
 
     CarbonProperties.getInstance()
       .addProperty("carbon.blockletgroup.size.in.mb", "1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 9c8cc15..90f0972 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
 
 class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
-  var dataMapName: String = _
+  var dataMapSchema: DataMapSchema = _
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
-    this.dataMapName = dataMapName
+    this.dataMapSchema = dataMapSchema
   }
 
   /**
    * Return a new write for this datamap
    */
   override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
+    new FGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
   }
 
   /**
@@ -136,7 +136,8 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
    * Return metadata of this datamap
    */
   override def getMeta: DataMapMeta = {
-    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
 }
 
@@ -225,12 +226,16 @@ class FGDataMap extends AbstractFineGrainDataMap {
   }
 
   def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.getChildren != null) {
-      expression.getChildren.asScala.map { f =>
-        if (f.isInstanceOf[EqualToExpression]) {
-          buffer += f
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
         }
-        getEqualToExpression(f, buffer)
       }
     }
   }
@@ -246,11 +251,12 @@ class FGDataMap extends AbstractFineGrainDataMap {
 }
 
 class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segment: Segment, dataWriterPath: String, dataMapName: String)
+    segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
   extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
 
   var currentBlockId: String = null
-  val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+                    ".datamap"
   val stream: DataOutputStream = FileFactory
     .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
   val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
@@ -421,14 +427,44 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      table.getAbsoluteTableIdentifier,
-      classOf[FGDataMapFactory].getName, "fgdatamap")
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
+  test("test fg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='city')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 6ac9c7a..717af6f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
@@ -42,39 +43,22 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
   val newClass = "org.apache.spark.sql.CarbonSource"
 
-  test("test datamap create: don't support using class, only support short name") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 1)
-      assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
-      assert(dataMapSchemaList.get(0).getClassName.equals(newClass))
     }
   }
 
-  test("test datamap create with dmproperties: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create with dmproperties: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 2)
-      assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
-      assert(dataMapSchemaList.get(1).getClassName.equals(newClass))
-      assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
     }
   }
 
-  test("test datamap create with existing name: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create with existing name: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(
         s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 2)
     }
   }
 
@@ -106,8 +90,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
       checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
 
-    }
-    finally {
+    } finally {
       sql("drop table hiveMetaStoreTable")
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
@@ -136,6 +119,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
         "datamap_hiveMetaStoreTable_1")
       assert(sql("show datamap on table hiveMetaStoreTable_1").collect().length == 0)
       sql("drop table hiveMetaStoreTable_1")
+
+      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
     }
     finally {
       CarbonProperties.getInstance()
@@ -145,17 +130,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test datamap create with preagg with duplicate name") {
-    intercept[Exception] {
-      sql(
-        s"""
-           | CREATE DATAMAP datamap2 ON TABLE datamaptest
-           | USING 'preaggregate'
-           | DMPROPERTIES('key'='value')
-           | AS SELECT COUNT(a) FROM datamaptest
+    sql(
+      s"""
+         | CREATE DATAMAP datamap10 ON TABLE datamaptest
+         | USING 'preaggregate'
+         | DMPROPERTIES('key'='value')
+         | AS SELECT COUNT(a) FROM datamaptest
          """.stripMargin)
+    intercept[MalformedDataMapCommandException] {
       sql(
         s"""
-           | CREATE DATAMAP datamap2 ON TABLE datamaptest
+           | CREATE DATAMAP datamap10 ON TABLE datamaptest
            | USING 'preaggregate'
            | DMPROPERTIES('key'='value')
            | AS SELECT COUNT(a) FROM datamaptest
@@ -167,10 +152,9 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test datamap drop with preagg") {
-    intercept[Exception] {
-      sql("drop table datamap3")
-
+  test("test drop non-exist datamap") {
+    intercept[NoSuchDataMapException] {
+      sql("drop datamap nonexist on table datamaptest")
     }
     val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
@@ -178,8 +162,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test show datamap without preaggregate: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap without preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
@@ -188,8 +172,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test show datamap with preaggregate: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap with preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
@@ -206,8 +190,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(sql("show datamap on table datamapshowtest").collect().length == 0)
   }
 
-  test("test show datamap after dropping datamap: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap after dropping datamap: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 60052f0..8d2f9ee 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -37,8 +37,8 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
 class SparkDataMapJob extends DataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
-      resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {
-    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
+      filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
+    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
       .asJava
   }
 }
@@ -53,7 +53,6 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
  * RDD to prune the datamaps across spark cluster
  * @param sc
  * @param dataMapFormat
- * @param resolverIntf
  */
 class DataMapPruneRDD(sc: SparkContext,
     dataMapFormat: DistributableDataMapFormat,
@@ -70,7 +69,6 @@ class DataMapPruneRDD(sc: SparkContext,
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
-    DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
     val iter = new Iterator[ExtendedBlocklet] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index adf5e04..aab2897 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -101,8 +101,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         CarbonException.analysisException(s"table path already exists.")
       case (SaveMode.Overwrite, true) =>
         val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
-        sqlContext.sparkSession
-          .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
+        // In order to overwrite, delete all segments in the table
+        sqlContext.sparkSession.sql(
+          s"""
+             | DELETE FROM TABLE $dbName.${options.tableName}
+             | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00'
+           """.stripMargin)
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
         (true, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index d536746..1de66c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -32,7 +32,9 @@ import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 
-class SparkUnknownExpression(var sparkExp: SparkExpression)
+class SparkUnknownExpression(
+    var sparkExp: SparkExpression,
+    expressionType: ExpressionType = ExpressionType.UNKNOWN)
   extends UnknownExpression with ConditionalExpression {
 
   private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
@@ -64,7 +66,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
   }
 
   override def getFilterExpressionType: ExpressionType = {
-    ExpressionType.UNKNOWN
+    expressionType
   }
 
   override def getString: String = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 6508777..c6d86b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,17 +16,23 @@
  */
 package org.apache.spark.sql.execution.command.datamap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.hive.CarbonRelation
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 
 /**
  * Below command class will be used to create datamap on table
@@ -52,61 +58,62 @@ case class CarbonCreateDataMapCommand(
     if (carbonTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = tableIdentifier.table + "_" + dataMapName
-    val newDmProperties = if (dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).isDefined) {
-      dmProperties.updated(TimeSeriesUtil.TIMESERIES_EVENTTIME,
-        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get.trim)
-    } else {
-      dmProperties
-    }
-    val dataMapProvider = {
-      try {
-        DataMapProvider.getDataMapProvider(dmClassName)
-      } catch {
-        case e: UnsupportedOperationException =>
-          throw new MalformedDataMapCommandException(e.getMessage)
-      }
-    }
-    if (sparkSession.sessionState.catalog.listTables(dbName)
-      .exists(_.table.equalsIgnoreCase(tableName))) {
-      LOGGER.audit(
-        s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
-          s"Table [$tableName] already exists under database [$dbName]")
-      tableIsExists = true
-      if (!ifNotExistsSet) {
-        throw new TableAlreadyExistsException(dbName, tableName)
-      }
-    } else {
-      TimeSeriesUtil.validateTimeSeriesGranularity(newDmProperties, dmClassName)
-      createPreAggregateTableCommands = if (dataMapProvider == TIMESERIES) {
-        val details = TimeSeriesUtil
-          .getTimeSeriesGranularityDetails(newDmProperties, dmClassName)
-        val updatedDmProperties = newDmProperties - details._1
+    validateDataMapName(carbonTable)
+
+    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+        dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+      TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmClassName)
+      createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+        val details = TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmClassName)
+        val updatedDmProperties = dmProperties - details._1
         CreatePreAggregateTableCommand(
           dataMapName,
           tableIdentifier,
-          dataMapProvider,
+          DataMapProvider.TIMESERIES,
           updatedDmProperties,
           queryString.get,
-          Some(details._1),
-          ifNotExistsSet = ifNotExistsSet)
+          Some(details._1))
       } else {
         CreatePreAggregateTableCommand(
           dataMapName,
           tableIdentifier,
-          dataMapProvider,
-          newDmProperties,
-          queryString.get,
-          ifNotExistsSet = ifNotExistsSet)
+          DataMapProvider.PREAGGREGATE,
+          dmProperties,
+          queryString.get
+        )
+      }
+      try {
+        createPreAggregateTableCommands.processMetadata(sparkSession)
+      } catch {
+        case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
+                                                                s"'$dataMapName'", e)
       }
-      createPreAggregateTableCommands.processMetadata(sparkSession)
+    } else {
+      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+      dataMapSchema.setProperties(new java.util.HashMap[String, String](dmProperties.asJava))
+      val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
+      val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+        Some(dbName),
+        tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+      DataMapStoreManager.getInstance().createAndRegisterDataMap(
+        carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
+      // Save DataMapSchema in the  schema file of main table
+      PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
     }
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
     Seq.empty
   }
 
+  private def validateDataMapName(carbonTable: CarbonTable): Unit = {
+    val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
+    existingDataMaps.asScala.foreach { dataMapSchema =>
+      if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
+        throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
+      }
+    }
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
@@ -116,7 +123,7 @@ case class CarbonCreateDataMapCommand(
         Seq.empty
       }
     } else {
-      throw new MalformedDataMapCommandException("Unknown datamap provider/class " + dmClassName)
+      Seq.empty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index dcc71a2..e89d15a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -74,7 +75,7 @@ case class CarbonDropDataMapCommand(
         Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession))
       } catch {
         case ex: NoSuchTableException =>
-          throw ex
+          throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
       }
       // If datamap to be dropped in parent table then drop the datamap from metastore and remove
       // entry from parent table.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index c836584..c02ac4f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
@@ -67,8 +68,10 @@ case class CreatePreAggregateTableCommand(
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
 
     parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
-      "Parent table name is different in select and create")
+    if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
+      throw new MalformedDataMapCommandException(
+        "Parent table name is different in select and create")
+    }
     var neworder = Seq[String]()
     val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
     parentOrder.foreach(parentcol =>
@@ -131,23 +134,19 @@ case class CreatePreAggregateTableCommand(
     dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
 
     // updating the parent table about child table
-    try {
-      PreAggregateUtil.updateMainTable(
-        CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
-        parentTableIdentifier.table,
-        childSchema,
-        sparkSession)
-    } catch {
-      case ex: Exception =>
-        undoMetadata(sparkSession, ex)
-        throw ex
-    }
+    PreAggregateUtil.updateMainTable(
+      parentTable,
+      childSchema,
+      sparkSession)
+    // After updating the parent carbon table with data map entry extract the latest table object
+    // to be used in further create process.
+    parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
+      parentTableIdentifier.table)(sparkSession)
     val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
       PreAggregateUtil.createTimeSeriesSelectQueryFromMain(childSchema.getChildSchema,
         parentTable.getTableName,
         parentTable.getDatabaseName)
-    }
-    else {
+    } else {
       queryString
     }
     val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index a5d256c..dd42b50 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -406,21 +407,20 @@ object PreAggregateUtil {
    * Below method will be used to update the main table about the pre aggregate table information
    * in case of any exception it will throw error so pre aggregate table creation will fail
    *
-   * @param dbName
-   * @param tableName
    * @param childSchema
    * @param sparkSession
    */
-  def updateMainTable(dbName: String, tableName: String,
+  def updateMainTable(carbonTable: CarbonTable,
       childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
-    var carbonTable: CarbonTable = null
+    var numberOfCurrentChild: Int = 0
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getTableName
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
@@ -433,7 +433,7 @@ object PreAggregateUtil {
         carbonTable.getTablePath)
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
-        throw new Exception("Duplicate datamap")
+        throw new MetadataProcessException("DataMap name already exist")
       }
       wrapperTableInfo.getDataMapSchemaList.add(childSchema)
       val thriftTable = schemaConverter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index f6d5e25..e82c9d7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -614,7 +614,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case StartsWith(a: Attribute, Literal(v, t)) =>
+      case s@StartsWith(a: Attribute, Literal(v, t)) =>
         Some(sources.StringStartsWith(a.name, v.toString))
       case c@EndsWith(a: Attribute, Literal(v, t)) =>
         Some(CarbonEndsWith(c))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e5eb53c..38c5146 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -41,6 +41,10 @@ import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonCol
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -50,6 +54,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  */
 object CarbonFilters {
 
+  val carbonProperties = CarbonProperties.getInstance()
   /**
    * Converts data sources filters to carbon filter predicates.
    */
@@ -117,25 +122,20 @@ object CarbonFilters {
             new OrExpression(lhsFilter, rhsFilter)
           }
         case sources.StringStartsWith(name, value) if value.length > 0 =>
-          val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value))
-          val maxValueLimit = value.substring(0, value.length - 1) +
-                              (value.charAt(value.length - 1).toInt + 1).toChar
-          val r = new LessThanExpression(
-            getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
-          Some(new AndExpression(l, r))
+          Some(new StartsWithExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
         case CarbonEndsWith(expr: Expression) =>
           Some(new SparkUnknownExpression(expr.transform {
             case AttributeReference(name, dataType, _, _) =>
               CarbonBoundReference(new CarbonColumnExpression(name.toString,
                 CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-          }))
+          }, ExpressionType.ENDSWITH))
         case CarbonContainsWith(expr: Expression) =>
           Some(new SparkUnknownExpression(expr.transform {
             case AttributeReference(name, dataType, _, _) =>
               CarbonBoundReference(new CarbonColumnExpression(name.toString,
                 CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-          }))
+          }, ExpressionType.CONTAINSWITH))
         case CastExpr(expr: Expression) =>
           Some(transformExpression(expr))
         case FalseExpr() =>
@@ -266,7 +266,7 @@ object CarbonFilters {
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
         case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
-        case StartsWith(a: Attribute, Literal(v, t)) =>
+        case s@StartsWith(a: Attribute, Literal(v, t)) =>
           Some(sources.StringStartsWith(a.name, v.toString))
         case c@EndsWith(a: Attribute, Literal(v, t)) =>
           Some(CarbonEndsWith(c))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 66f8bc5..f514074 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.processing.store.TablePage;
 
 /**
@@ -50,9 +50,9 @@ public class DataMapWriterListener {
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public void registerAllWriter(CarbonTable carbonTable, String segmentId,
       String dataWritePath) {
-    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 0ee77da..21a4b3e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -263,7 +263,7 @@ public final class DataLoadProcessBuilder {
     if (carbonTable.isHivePartitionTable()) {
       configuration.setWritingCoresCount((short) 1);
     }
-    TableSpec tableSpec = new TableSpec(dimensions, measures);
+    TableSpec tableSpec = new TableSpec(carbonTable);
     configuration.setTableSpec(tableSpec);
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d6af747..8aa5bde 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -260,8 +260,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
-        storeLocation[new Random().nextInt(storeLocation.length)]);
+    listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(),
+        configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
 
@@ -320,13 +320,11 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
 
-    carbonFactDataHandlerModel.tableSpec = new TableSpec(
-        segmentProperties.getDimensions(),
-        segmentProperties.getMeasures());
-
+    carbonFactDataHandlerModel.tableSpec =
+        new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable());
     DataMapWriterListener listener = new DataMapWriterListener();
     listener.registerAllWriter(
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        loadModel.getCarbonDataLoadSchema().getCarbonTable(),
         loadModel.getSegmentId(),
         tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;