You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/25 12:54:37 UTC

[03/10] carbondata git commit: [CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 5eb274d..86fd240 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -18,18 +18,6 @@
 
 package org.apache.carbondata.cluster.sdv.generated
 
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.common.util._
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
 /**
  * Test Class for AlterTableTestCase to verify all scenerios
  */
@@ -52,7 +40,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
-    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS carbon_automation_merge")
@@ -75,8 +63,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect()
-    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
-    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") <= 2)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index b3bf93d..0d33797 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -272,7 +272,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   test("test pre agg create table 22: using invalid datamap provider") {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
 
-    val e: Exception = intercept[Exception] {
+    val e = intercept[MalformedDataMapCommandException] {
       sql(
         """
           | CREATE DATAMAP agg0 ON TABLE mainTable
@@ -282,8 +282,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
           | GROUP BY column3,column5,column2
         """.stripMargin)
     }
-    assert(e.getMessage.contains(
-      s"Unknown data map type abc"))
+    assert(e.getMessage.contains("DataMap class 'abc' not found"))
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 97aa056..49cabea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -200,7 +200,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("Unknown data map type abc"))
+    assert(e.getMessage.equals("DataMap class 'abc' not found"))
   }
 
   test("test timeseries create table: USING and catch MalformedCarbonCommandException") {
@@ -215,10 +215,11 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("Unknown data map type abc"))
+    assert(e.getMessage.equals("DataMap class 'abc' not found"))
   }
 
   test("test timeseries create table: Only one granularity level can be defined 1") {
+    sql("drop datamap if exists agg0_second on table mainTable")
     val e: Exception = intercept[MalformedCarbonCommandException] {
       sql(
         s"""
@@ -235,6 +236,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
            | GROUP BY dataTime
        """.stripMargin)
     }
+    e.printStackTrace()
     assert(e.getMessage.equals("Only one granularity level can be defined"))
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
index c522c1e..e31896f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -113,7 +113,7 @@ object CompactionSupportGlobalSortBigFileTest {
     try {
       val write = new PrintWriter(fileName);
       for (i <- start until (start + line)) {
-        write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+        write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80))
       }
       write.close()
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 6f03493..47ef192 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -86,18 +86,18 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     buildTestData
   }
 
-test("test the boolean data type"){
-  booldf.write
-    .format("carbondata")
-    .option("tableName", "carbon10")
-    .option("tempCSV", "true")
-    .option("compress", "true")
-    .mode(SaveMode.Overwrite)
-    .save()
-  checkAnswer(
-    sql("SELECT * FROM CARBON10"),
-    Seq(Row("anubhav", true), Row("prince", false)))
-}
+  test("test the boolean data type"){
+    booldf.write
+      .format("carbondata")
+      .option("tableName", "carbon0")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("SELECT * FROM CARBON0"),
+      Seq(Row("anubhav", true), Row("prince", false)))
+  }
 
   test("test load dataframe with saving compressed csv files") {
     // save dataframe to carbon file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..d4c49d2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,30 +49,28 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
 
 class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
-  var dataMapName: String = _
+  var dataMapSchema: DataMapSchema = _
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
-    this.dataMapName = dataMapName
+    this.dataMapSchema = dataMapSchema
   }
 
   /**
    * Return a new write for this datamap
    */
   override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
   }
 
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+  override def getDataMaps(segmentId: String) = {
+    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +107,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+  override def toDistributable(segmentId: String) = {
+    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -140,7 +138,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    * Return metadata of this datamap
    */
   override def getMeta: DataMapMeta = {
-    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
 }
 
@@ -198,12 +197,16 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
   }
 
   private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.getChildren != null) {
-      expression.getChildren.asScala.map { f =>
-        if (f.isInstanceOf[EqualToExpression]) {
-          buffer += f
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
         }
-        getEqualToExpression(f, buffer)
       }
     }
   }
@@ -221,12 +224,12 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
 class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
     segmentId: String,
     dataWritePath: String,
-    dataMapName: String)
+    dataMapSchema: DataMapSchema)
   extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
 
   var currentBlockId: String = null
   val cgwritepath = dataWritePath + "/" +
-                    dataMapName + System.nanoTime() + ".datamap"
+                    dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
   lazy val stream: DataOutputStream = FileFactory
     .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
   val blockletList = new ArrayBuffer[Array[Byte]]()
@@ -345,14 +348,29 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      table.getAbsoluteTableIdentifier,
-      classOf[CGDataMapFactory].getName, "cgdatamap")
+    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
+  test("test cg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+    sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index f694a6b..903610a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Da
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
@@ -41,7 +42,7 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
 
   override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+      dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
   }
 
@@ -89,12 +90,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test write datamap 2 pages") {
+    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/carbon1", "default", "carbon1"),
-      classOf[C2DataMapFactory].getName,
-      "test")
-
+    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
     val df = buildTestData(33000)
 
     // save dataframe to carbon file
@@ -119,11 +117,8 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test write datamap 2 blocklet") {
-    // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/carbon2", "default", "carbon2"),
-      classOf[C2DataMapFactory].getName,
-      "test")
+    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
 
     CarbonProperties.getInstance()
       .addProperty("carbon.blockletgroup.size.in.mb", "1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..8031dc2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.FineGrainBlocklet
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
 
 class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
-  var dataMapName: String = _
+  var dataMapSchema: DataMapSchema = _
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
-    this.dataMapName = dataMapName
+    this.dataMapSchema = dataMapSchema
   }
 
   /**
    * Return a new write for this datamap
    */
   override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
   }
 
   /**
@@ -137,7 +137,8 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
    * Return metadata of this datamap
    */
   override def getMeta: DataMapMeta = {
-    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
 }
 
@@ -228,12 +229,16 @@ class FGDataMap extends AbstractFineGrainDataMap {
   }
 
   def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.getChildren != null) {
-      expression.getChildren.asScala.map { f =>
-        if (f.isInstanceOf[EqualToExpression]) {
-          buffer += f
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
         }
-        getEqualToExpression(f, buffer)
       }
     }
   }
@@ -249,11 +254,12 @@ class FGDataMap extends AbstractFineGrainDataMap {
 }
 
 class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String, dataWriterPath: String, dataMapName: String)
+    segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
   extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
 
   var currentBlockId: String = null
-  val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+                    ".datamap"
   val stream: DataOutputStream = FileFactory
     .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
   val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
@@ -424,14 +430,44 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      table.getAbsoluteTableIdentifier,
-      classOf[FGDataMapFactory].getName, "fgdatamap")
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
+  test("test fg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='city')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 37007ed..b2ab977 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
@@ -42,39 +43,22 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
   val newClass = "org.apache.spark.sql.CarbonSource"
 
-  test("test datamap create: don't support using class, only support short name") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 1)
-      assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
-      assert(dataMapSchemaList.get(0).getClassName.equals(newClass))
     }
   }
 
-  test("test datamap create with dmproperties: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create with dmproperties: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 2)
-      assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
-      assert(dataMapSchemaList.get(1).getClassName.equals(newClass))
-      assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
     }
   }
 
-  test("test datamap create with existing name: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create with existing name: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(
         s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 2)
     }
   }
 
@@ -106,8 +90,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
       checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
 
-    }
-    finally {
+    } finally {
       sql("drop table hiveMetaStoreTable")
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
@@ -133,8 +116,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql("drop table hiveMetaStoreTable_1")
 
       checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
-    }
-    finally {
+    } finally {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
           CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
@@ -142,17 +124,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test datamap create with preagg with duplicate name") {
-    intercept[Exception] {
-      sql(
-        s"""
-           | CREATE DATAMAP datamap2 ON TABLE datamaptest
-           | USING 'preaggregate'
-           | DMPROPERTIES('key'='value')
-           | AS SELECT COUNT(a) FROM datamaptest
+    sql(
+      s"""
+         | CREATE DATAMAP datamap10 ON TABLE datamaptest
+         | USING 'preaggregate'
+         | DMPROPERTIES('key'='value')
+         | AS SELECT COUNT(a) FROM datamaptest
          """.stripMargin)
+    intercept[MalformedDataMapCommandException] {
       sql(
         s"""
-           | CREATE DATAMAP datamap2 ON TABLE datamaptest
+           | CREATE DATAMAP datamap10 ON TABLE datamaptest
            | USING 'preaggregate'
            | DMPROPERTIES('key'='value')
            | AS SELECT COUNT(a) FROM datamaptest
@@ -164,10 +146,9 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test datamap drop with preagg") {
-    intercept[Exception] {
-      sql("drop table datamap3")
-
+  test("test drop non-exist datamap") {
+    intercept[NoSuchDataMapException] {
+      sql("drop datamap nonexist on table datamaptest")
     }
     val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
@@ -175,8 +156,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test show datamap without preaggregate: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap without preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
@@ -185,8 +166,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test show datamap with preaggregate: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap with preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
@@ -203,8 +184,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(sql("show datamap on table datamapshowtest").collect().length == 0)
   }
 
-  test("test show datamap after dropping datamap: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap after dropping datamap: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
index c200b1b..84b59e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -46,10 +47,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
     buildTestData()
 
     // register hook to the table to sleep, thus the other command will be executed
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
-      classOf[WaitingDataMap].getName,
-      "test")
+    sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1")
   }
 
   private def buildTestData(): Unit = {
@@ -166,7 +164,7 @@ object Global {
 
 class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { }
 
   override def fireEvent(event: Event): Unit = ???
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 60052f0..8d2f9ee 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -37,8 +37,8 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
 class SparkDataMapJob extends DataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
-      resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {
-    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
+      filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
+    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
       .asJava
   }
 }
@@ -53,7 +53,6 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
  * RDD to prune the datamaps across spark cluster
  * @param sc
  * @param dataMapFormat
- * @param resolverIntf
  */
 class DataMapPruneRDD(sc: SparkContext,
     dataMapFormat: DistributableDataMapFormat,
@@ -70,7 +69,6 @@ class DataMapPruneRDD(sc: SparkContext,
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
-    DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
     val iter = new Iterator[ExtendedBlocklet] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 5dcca6d..ec26c34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -95,8 +95,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         CarbonException.analysisException(s"table path already exists.")
       case (SaveMode.Overwrite, true) =>
         val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
-        sqlContext.sparkSession
-          .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
+        // In order to overwrite, delete all segments in the table
+        sqlContext.sparkSession.sql(
+          s"""
+             | DELETE FROM TABLE $dbName.${options.tableName}
+             | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00'
+           """.stripMargin)
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
         (true, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index d536746..1de66c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -32,7 +32,9 @@ import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 
-class SparkUnknownExpression(var sparkExp: SparkExpression)
+class SparkUnknownExpression(
+    var sparkExp: SparkExpression,
+    expressionType: ExpressionType = ExpressionType.UNKNOWN)
   extends UnknownExpression with ConditionalExpression {
 
   private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
@@ -64,7 +66,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
   }
 
   override def getFilterExpressionType: ExpressionType = {
-    ExpressionType.UNKNOWN
+    expressionType
   }
 
   override def getString: String = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index e93ab25..3d3f83b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,15 +16,21 @@
  */
 package org.apache.spark.sql.execution.command.datamap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.hive.CarbonRelation
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 
 /**
  * Below command class will be used to create datamap on table
@@ -48,7 +54,7 @@ case class CarbonCreateDataMapCommand(
     if (carbonTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    validateDataMapName(carbonTable)
 
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
@@ -72,20 +78,44 @@ case class CarbonCreateDataMapCommand(
           queryString.get
         )
       }
-      createPreAggregateTableCommands.processMetadata(sparkSession)
+      try {
+        createPreAggregateTableCommands.processMetadata(sparkSession)
+      } catch {
+        case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
+                                                                s"'$dataMapName'", e)
+      }
     } else {
-      throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
+      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+      dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
+      val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
+      val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+        Some(dbName),
+        tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+      DataMapStoreManager.getInstance().createAndRegisterDataMap(
+        carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
+      // Save DataMapSchema in the  schema file of main table
+      PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
     }
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
     Seq.empty
   }
 
+  private def validateDataMapName(carbonTable: CarbonTable) = {
+    val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
+    existingDataMaps.asScala.foreach { dataMapSchema =>
+      if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
+        throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
+      }
+    }
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
       createPreAggregateTableCommands.processData(sparkSession)
     } else {
-      throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
+      Seq.empty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index e5db286..f410f52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -72,7 +73,7 @@ case class CarbonDropDataMapCommand(
         Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession))
       } catch {
         case ex: NoSuchTableException =>
-          throw ex
+          throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
       }
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
         val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index bf72325..6d4822b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -64,8 +65,10 @@ case class CreatePreAggregateTableCommand(
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
 
     parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
-      "Parent table name is different in select and create")
+    if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
+      throw new MalformedDataMapCommandException(
+        "Parent table name is different in select and create")
+    }
     var neworder = Seq[String]()
     val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
     parentOrder.foreach(parentcol =>
@@ -131,8 +134,7 @@ case class CreatePreAggregateTableCommand(
 
     // updating the parent table about child table
     PreAggregateUtil.updateMainTable(
-      CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
-      parentTableIdentifier.table,
+      parentTable,
       childSchema,
       sparkSession)
     // After updating the parent carbon table with data map entry extract the latest table object
@@ -147,8 +149,7 @@ case class CreatePreAggregateTableCommand(
       PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
         parentTable.getTableName,
         parentTable.getDatabaseName)
-    }
-    else {
+    } else {
       queryString
     }
     val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 7e3b80e..1073f63 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -404,22 +405,20 @@ object PreAggregateUtil {
    * Below method will be used to update the main table about the pre aggregate table information
    * in case of any exception it will throw error so pre aggregate table creation will fail
    *
-   * @param dbName
-   * @param tableName
    * @param childSchema
    * @param sparkSession
    */
-  def updateMainTable(dbName: String, tableName: String,
+  def updateMainTable(carbonTable: CarbonTable,
       childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
-    var carbonTable: CarbonTable = null
     var numberOfCurrentChild: Int = 0
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getTableName
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
@@ -433,7 +432,7 @@ object PreAggregateUtil {
       numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
-        throw new Exception("Duplicate datamap")
+        throw new MetadataProcessException("DataMap name already exist")
       }
       wrapperTableInfo.getDataMapSchemaList.add(childSchema)
       val thriftTable = schemaConverter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index f38304e..79ff15e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
-import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.MetadataCommand
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 46e24dd..4b2bfa9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -610,7 +610,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case StartsWith(a: Attribute, Literal(v, t)) =>
+      case s@StartsWith(a: Attribute, Literal(v, t)) =>
         Some(sources.StringStartsWith(a.name, v.toString))
       case c@EndsWith(a: Attribute, Literal(v, t)) =>
         Some(CarbonEndsWith(c))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 4d91375..c062cfb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -36,8 +36,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -47,6 +49,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  */
 object CarbonFilters {
 
+  val carbonProperties = CarbonProperties.getInstance()
   /**
    * Converts data sources filters to carbon filter predicates.
    */
@@ -114,25 +117,20 @@ object CarbonFilters {
             new OrExpression(lhsFilter, rhsFilter)
           }
         case sources.StringStartsWith(name, value) if value.length > 0 =>
-          val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value))
-          val maxValueLimit = value.substring(0, value.length - 1) +
-                              (value.charAt(value.length - 1).toInt + 1).toChar
-          val r = new LessThanExpression(
-            getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
-          Some(new AndExpression(l, r))
+          Some(new StartsWithExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
         case CarbonEndsWith(expr: Expression) =>
           Some(new SparkUnknownExpression(expr.transform {
             case AttributeReference(name, dataType, _, _) =>
               CarbonBoundReference(new CarbonColumnExpression(name.toString,
                 CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-          }))
+          }, ExpressionType.ENDSWITH))
         case CarbonContainsWith(expr: Expression) =>
           Some(new SparkUnknownExpression(expr.transform {
             case AttributeReference(name, dataType, _, _) =>
               CarbonBoundReference(new CarbonColumnExpression(name.toString,
                 CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-          }))
+          }, ExpressionType.CONTAINSWITH))
         case CastExpr(expr: Expression) =>
           Some(transformExpression(expr))
         case _ => None
@@ -261,7 +259,7 @@ object CarbonFilters {
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
         case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
-        case StartsWith(a: Attribute, Literal(v, t)) =>
+        case s@StartsWith(a: Attribute, Literal(v, t)) =>
           Some(sources.StringStartsWith(a.name, v.toString))
         case c@EndsWith(a: Attribute, Literal(v, t)) =>
           Some(CarbonEndsWith(c))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 31a6701..e817590 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.processing.store.TablePage;
 
 /**
@@ -49,9 +49,9 @@ public class DataMapWriterListener {
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public void registerAllWriter(CarbonTable carbonTable, String segmentId,
       String dataWritePath) {
-    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index b8e9062..4410e2a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -233,7 +233,7 @@ public final class DataLoadProcessBuilder {
     if (carbonTable.isHivePartitionTable()) {
       configuration.setWritingCoresCount((short) 1);
     }
-    TableSpec tableSpec = new TableSpec(dimensions, measures);
+    TableSpec tableSpec = new TableSpec(carbonTable);
     configuration.setTableSpec(tableSpec);
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0e9cbc5..dd59ed3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -260,8 +260,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
-        storeLocation[new Random().nextInt(storeLocation.length)]);
+    listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(),
+        configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
 
@@ -321,13 +321,11 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
 
-    carbonFactDataHandlerModel.tableSpec = new TableSpec(
-        segmentProperties.getDimensions(),
-        segmentProperties.getMeasures());
-
+    carbonFactDataHandlerModel.tableSpec =
+        new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable());
     DataMapWriterListener listener = new DataMapWriterListener();
     listener.registerAllWriter(
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        loadModel.getCarbonDataLoadSchema().getCarbonTable(),
         loadModel.getSegmentId(),
         tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;