You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/22 05:56:00 UTC

[1/2] carbondata git commit: [CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

Repository: carbondata
Updated Branches:
  refs/heads/datamap 88c0527f5 -> e616162c0


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 5eb274d..86fd240 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -18,18 +18,6 @@
 
 package org.apache.carbondata.cluster.sdv.generated
 
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.common.util._
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
 /**
  * Test Class for AlterTableTestCase to verify all scenerios
  */
@@ -52,7 +40,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
-    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS carbon_automation_merge")
@@ -75,8 +63,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect()
-    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
-    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") <= 2)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index b3bf93d..0d33797 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -272,7 +272,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   test("test pre agg create table 22: using invalid datamap provider") {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
 
-    val e: Exception = intercept[Exception] {
+    val e = intercept[MalformedDataMapCommandException] {
       sql(
         """
           | CREATE DATAMAP agg0 ON TABLE mainTable
@@ -282,8 +282,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
           | GROUP BY column3,column5,column2
         """.stripMargin)
     }
-    assert(e.getMessage.contains(
-      s"Unknown data map type abc"))
+    assert(e.getMessage.contains("DataMap class 'abc' not found"))
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 97aa056..49cabea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -200,7 +200,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("Unknown data map type abc"))
+    assert(e.getMessage.equals("DataMap class 'abc' not found"))
   }
 
   test("test timeseries create table: USING and catch MalformedCarbonCommandException") {
@@ -215,10 +215,11 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
           | GROUP BY dataTime
         """.stripMargin)
     }
-    assert(e.getMessage.equals("Unknown data map type abc"))
+    assert(e.getMessage.equals("DataMap class 'abc' not found"))
   }
 
   test("test timeseries create table: Only one granularity level can be defined 1") {
+    sql("drop datamap if exists agg0_second on table mainTable")
     val e: Exception = intercept[MalformedCarbonCommandException] {
       sql(
         s"""
@@ -235,6 +236,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
            | GROUP BY dataTime
        """.stripMargin)
     }
+    e.printStackTrace()
     assert(e.getMessage.equals("Only one granularity level can be defined"))
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
index c522c1e..e31896f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -113,7 +113,7 @@ object CompactionSupportGlobalSortBigFileTest {
     try {
       val write = new PrintWriter(fileName);
       for (i <- start until (start + line)) {
-        write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+        write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80))
       }
       write.close()
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 6f03493..47ef192 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -86,18 +86,18 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     buildTestData
   }
 
-test("test the boolean data type"){
-  booldf.write
-    .format("carbondata")
-    .option("tableName", "carbon10")
-    .option("tempCSV", "true")
-    .option("compress", "true")
-    .mode(SaveMode.Overwrite)
-    .save()
-  checkAnswer(
-    sql("SELECT * FROM CARBON10"),
-    Seq(Row("anubhav", true), Row("prince", false)))
-}
+  test("test the boolean data type"){
+    booldf.write
+      .format("carbondata")
+      .option("tableName", "carbon0")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("SELECT * FROM CARBON0"),
+      Seq(Row("anubhav", true), Row("prince", false)))
+  }
 
   test("test load dataframe with saving compressed csv files") {
     // save dataframe to carbon file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..d4c49d2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,30 +49,28 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
 
 class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
-  var dataMapName: String = _
+  var dataMapSchema: DataMapSchema = _
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
-    this.dataMapName = dataMapName
+    this.dataMapSchema = dataMapSchema
   }
 
   /**
    * Return a new write for this datamap
    */
   override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
   }
 
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+  override def getDataMaps(segmentId: String) = {
+    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +107,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
-    val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+  override def toDistributable(segmentId: String) = {
+    val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -140,7 +138,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    * Return metadata of this datamap
    */
   override def getMeta: DataMapMeta = {
-    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
 }
 
@@ -198,12 +197,16 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
   }
 
   private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.getChildren != null) {
-      expression.getChildren.asScala.map { f =>
-        if (f.isInstanceOf[EqualToExpression]) {
-          buffer += f
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
         }
-        getEqualToExpression(f, buffer)
       }
     }
   }
@@ -221,12 +224,12 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
 class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
     segmentId: String,
     dataWritePath: String,
-    dataMapName: String)
+    dataMapSchema: DataMapSchema)
   extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
 
   var currentBlockId: String = null
   val cgwritepath = dataWritePath + "/" +
-                    dataMapName + System.nanoTime() + ".datamap"
+                    dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
   lazy val stream: DataOutputStream = FileFactory
     .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
   val blockletList = new ArrayBuffer[Array[Byte]]()
@@ -345,14 +348,29 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      table.getAbsoluteTableIdentifier,
-      classOf[CGDataMapFactory].getName, "cgdatamap")
+    sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
+  test("test cg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+    sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index f694a6b..903610a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Da
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
@@ -41,7 +42,7 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
 
   override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+      dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
   }
 
@@ -89,12 +90,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test write datamap 2 pages") {
+    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/carbon1", "default", "carbon1"),
-      classOf[C2DataMapFactory].getName,
-      "test")
-
+    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
     val df = buildTestData(33000)
 
     // save dataframe to carbon file
@@ -119,11 +117,8 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test write datamap 2 blocklet") {
-    // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/carbon2", "default", "carbon2"),
-      classOf[C2DataMapFactory].getName,
-      "test")
+    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
 
     CarbonProperties.getInstance()
       .addProperty("carbon.blockletgroup.size.in.mb", "1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..8031dc2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.FineGrainBlocklet
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
 
 class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   var identifier: AbsoluteTableIdentifier = _
-  var dataMapName: String = _
+  var dataMapSchema: DataMapSchema = _
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
     this.identifier = identifier
-    this.dataMapName = dataMapName
+    this.dataMapSchema = dataMapSchema
   }
 
   /**
    * Return a new write for this datamap
    */
   override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
   }
 
   /**
@@ -137,7 +137,8 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
    * Return metadata of this datamap
    */
   override def getMeta: DataMapMeta = {
-    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+    new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+      List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
 }
 
@@ -228,12 +229,16 @@ class FGDataMap extends AbstractFineGrainDataMap {
   }
 
   def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
-    if (expression.getChildren != null) {
-      expression.getChildren.asScala.map { f =>
-        if (f.isInstanceOf[EqualToExpression]) {
-          buffer += f
+    if (expression.isInstanceOf[EqualToExpression]) {
+      buffer += expression
+    } else {
+      if (expression.getChildren != null) {
+        expression.getChildren.asScala.map { f =>
+          if (f.isInstanceOf[EqualToExpression]) {
+            buffer += f
+          }
+          getEqualToExpression(f, buffer)
         }
-        getEqualToExpression(f, buffer)
       }
     }
   }
@@ -249,11 +254,12 @@ class FGDataMap extends AbstractFineGrainDataMap {
 }
 
 class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String, dataWriterPath: String, dataMapName: String)
+    segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
   extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
 
   var currentBlockId: String = null
-  val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+  val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+                    ".datamap"
   val stream: DataOutputStream = FileFactory
     .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
   val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
@@ -424,14 +430,44 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
     // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      table.getAbsoluteTableIdentifier,
-      classOf[FGDataMapFactory].getName, "fgdatamap")
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
     checkAnswer(sql("select * from datamap_test where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
+  test("test fg datamap with 2 datamaps ") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='name')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+         | USING '${classOf[FGDataMapFactory].getName}'
+         | DMPROPERTIES('indexcolumns'='city')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+      sql("select * from normal_test where name='n502670' and city='c2670'"))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 37007ed..b2ab977 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
@@ -42,39 +43,22 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
   val newClass = "org.apache.spark.sql.CarbonSource"
 
-  test("test datamap create: don't support using class, only support short name") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 1)
-      assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
-      assert(dataMapSchemaList.get(0).getClassName.equals(newClass))
     }
   }
 
-  test("test datamap create with dmproperties: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create with dmproperties: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 2)
-      assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
-      assert(dataMapSchemaList.get(1).getClassName.equals(newClass))
-      assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
     }
   }
 
-  test("test datamap create with existing name: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test datamap create with existing name: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql(
         s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
-      val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
-      assert(table != null)
-      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-      assert(dataMapSchemaList.size() == 2)
     }
   }
 
@@ -106,8 +90,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
       checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
 
-    }
-    finally {
+    } finally {
       sql("drop table hiveMetaStoreTable")
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
@@ -133,8 +116,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql("drop table hiveMetaStoreTable_1")
 
       checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
-    }
-    finally {
+    } finally {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
           CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
@@ -142,17 +124,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test datamap create with preagg with duplicate name") {
-    intercept[Exception] {
-      sql(
-        s"""
-           | CREATE DATAMAP datamap2 ON TABLE datamaptest
-           | USING 'preaggregate'
-           | DMPROPERTIES('key'='value')
-           | AS SELECT COUNT(a) FROM datamaptest
+    sql(
+      s"""
+         | CREATE DATAMAP datamap10 ON TABLE datamaptest
+         | USING 'preaggregate'
+         | DMPROPERTIES('key'='value')
+         | AS SELECT COUNT(a) FROM datamaptest
          """.stripMargin)
+    intercept[MalformedDataMapCommandException] {
       sql(
         s"""
-           | CREATE DATAMAP datamap2 ON TABLE datamaptest
+           | CREATE DATAMAP datamap10 ON TABLE datamaptest
            | USING 'preaggregate'
            | DMPROPERTIES('key'='value')
            | AS SELECT COUNT(a) FROM datamaptest
@@ -164,10 +146,9 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test datamap drop with preagg") {
-    intercept[Exception] {
-      sql("drop table datamap3")
-
+  test("test drop non-exist datamap") {
+    intercept[NoSuchDataMapException] {
+      sql("drop datamap nonexist on table datamaptest")
     }
     val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
@@ -175,8 +156,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test show datamap without preaggregate: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap without preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
@@ -185,8 +166,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test show datamap with preaggregate: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap with preaggregate: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
@@ -203,8 +184,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(sql("show datamap on table datamapshowtest").collect().length == 0)
   }
 
-  test("test show datamap after dropping datamap: don't support using class") {
-    intercept[MalformedDataMapCommandException] {
+  test("test show datamap after dropping datamap: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
       sql("drop table if exists datamapshowtest")
       sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
       sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
index c200b1b..84b59e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -46,10 +47,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
     buildTestData()
 
     // register hook to the table to sleep, thus the other command will be executed
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
-      classOf[WaitingDataMap].getName,
-      "test")
+    sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1")
   }
 
   private def buildTestData(): Unit = {
@@ -166,7 +164,7 @@ object Global {
 
 class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { }
 
   override def fireEvent(event: Event): Unit = ???
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 60052f0..8d2f9ee 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -37,8 +37,8 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
 class SparkDataMapJob extends DataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
-      resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {
-    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
+      filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
+    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
       .asJava
   }
 }
@@ -53,7 +53,6 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
  * RDD to prune the datamaps across spark cluster
  * @param sc
  * @param dataMapFormat
- * @param resolverIntf
  */
 class DataMapPruneRDD(sc: SparkContext,
     dataMapFormat: DistributableDataMapFormat,
@@ -70,7 +69,6 @@ class DataMapPruneRDD(sc: SparkContext,
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
-    DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
     val iter = new Iterator[ExtendedBlocklet] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 5dcca6d..ec26c34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -95,8 +95,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         CarbonException.analysisException(s"table path already exists.")
       case (SaveMode.Overwrite, true) =>
         val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
-        sqlContext.sparkSession
-          .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
+        // In order to overwrite, delete all segments in the table
+        sqlContext.sparkSession.sql(
+          s"""
+             | DELETE FROM TABLE $dbName.${options.tableName}
+             | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00'
+           """.stripMargin)
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
         (true, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index d536746..1de66c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -32,7 +32,9 @@ import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 
-class SparkUnknownExpression(var sparkExp: SparkExpression)
+class SparkUnknownExpression(
+    var sparkExp: SparkExpression,
+    expressionType: ExpressionType = ExpressionType.UNKNOWN)
   extends UnknownExpression with ConditionalExpression {
 
   private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
@@ -64,7 +66,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
   }
 
   override def getFilterExpressionType: ExpressionType = {
-    ExpressionType.UNKNOWN
+    expressionType
   }
 
   override def getString: String = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index e93ab25..3d3f83b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,15 +16,21 @@
  */
 package org.apache.spark.sql.execution.command.datamap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.hive.CarbonRelation
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 
 /**
  * Below command class will be used to create datamap on table
@@ -48,7 +54,7 @@ case class CarbonCreateDataMapCommand(
     if (carbonTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    validateDataMapName(carbonTable)
 
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
@@ -72,20 +78,44 @@ case class CarbonCreateDataMapCommand(
           queryString.get
         )
       }
-      createPreAggregateTableCommands.processMetadata(sparkSession)
+      try {
+        createPreAggregateTableCommands.processMetadata(sparkSession)
+      } catch {
+        case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
+                                                                s"'$dataMapName'", e)
+      }
     } else {
-      throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
+      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+      dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
+      val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
+      val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+        Some(dbName),
+        tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+      DataMapStoreManager.getInstance().createAndRegisterDataMap(
+        carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
+      // Save DataMapSchema in the  schema file of main table
+      PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
     }
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
     Seq.empty
   }
 
+  private def validateDataMapName(carbonTable: CarbonTable) = {
+    val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
+    existingDataMaps.asScala.foreach { dataMapSchema =>
+      if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
+        throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
+      }
+    }
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
       createPreAggregateTableCommands.processData(sparkSession)
     } else {
-      throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
+      Seq.empty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index e5db286..f410f52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -72,7 +73,7 @@ case class CarbonDropDataMapCommand(
         Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession))
       } catch {
         case ex: NoSuchTableException =>
-          throw ex
+          throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
       }
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
         val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index bf72325..6d4822b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -64,8 +65,10 @@ case class CreatePreAggregateTableCommand(
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
 
     parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
-      "Parent table name is different in select and create")
+    if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
+      throw new MalformedDataMapCommandException(
+        "Parent table name is different in select and create")
+    }
     var neworder = Seq[String]()
     val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
     parentOrder.foreach(parentcol =>
@@ -131,8 +134,7 @@ case class CreatePreAggregateTableCommand(
 
     // updating the parent table about child table
     PreAggregateUtil.updateMainTable(
-      CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
-      parentTableIdentifier.table,
+      parentTable,
       childSchema,
       sparkSession)
     // After updating the parent carbon table with data map entry extract the latest table object
@@ -147,8 +149,7 @@ case class CreatePreAggregateTableCommand(
       PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
         parentTable.getTableName,
         parentTable.getDatabaseName)
-    }
-    else {
+    } else {
       queryString
     }
     val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 7e3b80e..1073f63 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -404,22 +405,20 @@ object PreAggregateUtil {
    * Below method will be used to update the main table about the pre aggregate table information
    * in case of any exception it will throw error so pre aggregate table creation will fail
    *
-   * @param dbName
-   * @param tableName
    * @param childSchema
    * @param sparkSession
    */
-  def updateMainTable(dbName: String, tableName: String,
+  def updateMainTable(carbonTable: CarbonTable,
       childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
-    var carbonTable: CarbonTable = null
     var numberOfCurrentChild: Int = 0
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getTableName
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
@@ -433,7 +432,7 @@ object PreAggregateUtil {
       numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
-        throw new Exception("Duplicate datamap")
+        throw new MetadataProcessException("DataMap name already exist")
       }
       wrapperTableInfo.getDataMapSchemaList.add(childSchema)
       val thriftTable = schemaConverter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index f38304e..79ff15e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
-import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.MetadataCommand
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4b1d11b..f00917e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -610,7 +610,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case StartsWith(a: Attribute, Literal(v, t)) =>
+      case s@StartsWith(a: Attribute, Literal(v, t)) =>
         Some(sources.StringStartsWith(a.name, v.toString))
       case c@EndsWith(a: Attribute, Literal(v, t)) =>
         Some(CarbonEndsWith(c))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 4d91375..c062cfb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -36,8 +36,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -47,6 +49,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  */
 object CarbonFilters {
 
+  val carbonProperties = CarbonProperties.getInstance()
   /**
    * Converts data sources filters to carbon filter predicates.
    */
@@ -114,25 +117,20 @@ object CarbonFilters {
             new OrExpression(lhsFilter, rhsFilter)
           }
         case sources.StringStartsWith(name, value) if value.length > 0 =>
-          val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value))
-          val maxValueLimit = value.substring(0, value.length - 1) +
-                              (value.charAt(value.length - 1).toInt + 1).toChar
-          val r = new LessThanExpression(
-            getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
-          Some(new AndExpression(l, r))
+          Some(new StartsWithExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
         case CarbonEndsWith(expr: Expression) =>
           Some(new SparkUnknownExpression(expr.transform {
             case AttributeReference(name, dataType, _, _) =>
               CarbonBoundReference(new CarbonColumnExpression(name.toString,
                 CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-          }))
+          }, ExpressionType.ENDSWITH))
         case CarbonContainsWith(expr: Expression) =>
           Some(new SparkUnknownExpression(expr.transform {
             case AttributeReference(name, dataType, _, _) =>
               CarbonBoundReference(new CarbonColumnExpression(name.toString,
                 CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-          }))
+          }, ExpressionType.CONTAINSWITH))
         case CastExpr(expr: Expression) =>
           Some(transformExpression(expr))
         case _ => None
@@ -261,7 +259,7 @@ object CarbonFilters {
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
         case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
-        case StartsWith(a: Attribute, Literal(v, t)) =>
+        case s@StartsWith(a: Attribute, Literal(v, t)) =>
           Some(sources.StringStartsWith(a.name, v.toString))
         case c@EndsWith(a: Attribute, Literal(v, t)) =>
           Some(CarbonEndsWith(c))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 31a6701..e817590 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.processing.store.TablePage;
 
 /**
@@ -49,9 +49,9 @@ public class DataMapWriterListener {
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public void registerAllWriter(CarbonTable carbonTable, String segmentId,
       String dataWritePath) {
-    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index cf045a4..98298ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -226,7 +226,7 @@ public final class DataLoadProcessBuilder {
     if (carbonTable.isHivePartitionTable()) {
       configuration.setWritingCoresCount((short) 1);
     }
-    TableSpec tableSpec = new TableSpec(dimensions, measures);
+    TableSpec tableSpec = new TableSpec(carbonTable);
     configuration.setTableSpec(tableSpec);
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0e9cbc5..dd59ed3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -260,8 +260,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
-        storeLocation[new Random().nextInt(storeLocation.length)]);
+    listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(),
+        configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
 
@@ -321,13 +321,11 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
 
-    carbonFactDataHandlerModel.tableSpec = new TableSpec(
-        segmentProperties.getDimensions(),
-        segmentProperties.getMeasures());
-
+    carbonFactDataHandlerModel.tableSpec =
+        new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable());
     DataMapWriterListener listener = new DataMapWriterListener();
     listener.registerAllWriter(
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        loadModel.getCarbonDataLoadSchema().getCarbonTable(),
         loadModel.getSegmentId(),
         tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;


[2/2] carbondata git commit: [CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

Posted by ja...@apache.org.
[CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

This PR supports 3 features.

1.Load datamaps from the DataMapSchema which are created through DDL.
2.DataMap Chooser: It chooses the datamap out of available datamaps based on simple logic. Like if there is filter condition on column1 then for supposing 2 datamaps(1. column1 2. column1+column2) are supporting this column then we choose the datamap which has fewer columns that is the first datamap.
3.Expression support: Based on the filter expressions we convert them to the possible DataMap expressions and do apply expression on it.
For example, there are 2 datamaps available on table1
Datamap1 : column1
Datamap2 : column2
Query: select * from table1 where column1 ='a' and column2 =b
For the above query, we create datamap expression as AndDataMapExpression(Datamap1, DataMap2). So for the above query both the datamaps are included and the output of them will be applied AND condition to improve the performance

This closes #1510


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e616162c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e616162c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e616162c

Branch: refs/heads/datamap
Commit: e616162c023f055189bf6685471fc448b54c20ef
Parents: 88c0527
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Tue Nov 21 15:49:11 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Feb 22 13:55:31 2018 +0800

----------------------------------------------------------------------
 .../exceptions/MetadataProcessException.java    |  37 +++
 .../carbondata/core/datamap/DataMapChooser.java | 284 +++++++++++++++++++
 .../core/datamap/DataMapDistributable.java      |  21 +-
 .../core/datamap/DataMapStoreManager.java       | 147 +++++++---
 .../carbondata/core/datamap/TableDataMap.java   |  23 +-
 .../core/datamap/dev/DataMapFactory.java        |   3 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java |  97 +++++++
 .../dev/expr/DataMapDistributableWrapper.java   |  56 ++++
 .../datamap/dev/expr/DataMapExprWrapper.java    |  77 +++++
 .../dev/expr/DataMapExprWrapperImpl.java        |  86 ++++++
 .../datamap/dev/expr/OrDataMapExprWrapper.java  |  94 ++++++
 .../carbondata/core/datastore/TableSpec.java    |  14 +-
 .../carbondata/core/indexstore/Blocklet.java    |  20 ++
 .../core/indexstore/ExtendedBlocklet.java       |  33 ++-
 .../core/indexstore/FineGrainBlocklet.java      |   8 +
 .../blockletindex/BlockletDataMap.java          |   2 -
 .../blockletindex/BlockletDataMapFactory.java   |   8 +-
 .../conditional/StartsWithExpression.java       |  72 +++++
 .../scan/filter/FilterExpressionProcessor.java  |  20 +-
 .../core/scan/filter/intf/ExpressionType.java   |   6 +-
 .../statusmanager/SegmentStatusManager.java     |   5 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |   3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  30 +-
 .../carbondata/hadoop/api/DataMapJob.java       |   2 +-
 .../hadoop/api/DistributableDataMapFormat.java  |  32 ++-
 .../sdv/generated/MergeIndexTestCase.scala      |  18 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |   7 +-
 .../timeseries/TestTimeSeriesCreateTable.scala  |   6 +-
 ...CompactionSupportGlobalSortBigFileTest.scala |   2 +-
 .../testsuite/dataload/TestLoadDataFrame.scala  |  24 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  62 ++--
 .../testsuite/datamap/DataMapWriterSuite.scala  |  17 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  68 +++--
 .../testsuite/datamap/TestDataMapCommand.scala  |  73 ++---
 .../iud/InsertOverwriteConcurrentTest.scala     |   8 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   6 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   8 +-
 .../spark/sql/SparkUnknownExpression.scala      |   6 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  40 ++-
 .../datamap/CarbonDropDataMapCommand.scala      |   3 +-
 .../CreatePreAggregateTableCommand.scala        |  13 +-
 .../preaaggregate/PreAggregateUtil.scala        |  11 +-
 .../table/CarbonCreateTableCommand.scala        |   2 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |  20 +-
 .../datamap/DataMapWriterListener.java          |   6 +-
 .../loading/DataLoadProcessBuilder.java         |   2 +-
 .../store/CarbonFactDataHandlerModel.java       |  12 +-
 48 files changed, 1294 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
new file mode 100644
index 0000000..aaeee5e
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when failed to process metadata while executing
+ * carbon command
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class MetadataProcessException extends RuntimeException {
+  public MetadataProcessException(String message) {
+    super(message);
+  }
+
+  public MetadataProcessException(String message, Throwable cause) {
+    super(message + ": " + cause.getMessage(), cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
new file mode 100644
index 0000000..5155009
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.dev.expr.AndDataMapExprWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapperImpl;
+import org.apache.carbondata.core.datamap.dev.expr.OrDataMapExprWrapper;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl;
+
+/**
+ * This chooser does 2 jobs.
+ * 1. Based on filter expression it converts the available datamaps to datamap expression.
+ *   For example, there are 2 datamaps available on table1
+ *   Datamap1 : column1
+ *   Datamap2 : column2
+ *   Query: select * from table1 where column1 ='a' and column2 =b
+ *   For the above query, we create datamap expression as AndDataMapExpression(Datamap1, DataMap2).
+ *   So for the above query both the datamaps are included and the output of them will be
+ *   applied AND condition to improve the performance
+ *
+ * 2. It chooses the datamap out of available datamaps based on simple logic.
+ *   Like if there is filter condition on column1 then for
+ *   supposing 2 datamaps(1. column1 2. column1+column2) are supporting this column then we choose
+ *   the datamap which has fewer columns that is the first datamap.
+ */
+@InterfaceAudience.Internal
+public class DataMapChooser {
+
+  private static DataMapChooser INSTANCE;
+
+  private DataMapChooser() { }
+
+  public static DataMapChooser get() {
+    if (INSTANCE == null) {
+      INSTANCE = new DataMapChooser();
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Return a chosen datamap based on input filter. See {@link DataMapChooser}
+   */
+  public DataMapExprWrapper choose(CarbonTable carbonTable, FilterResolverIntf resolverIntf) {
+    if (resolverIntf != null) {
+      Expression expression = resolverIntf.getFilterExpression();
+      // First check for FG datamaps if any exist
+      List<TableDataMap> allDataMapFG =
+          DataMapStoreManager.getInstance().getAllDataMap(carbonTable, DataMapType.FG);
+      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG);
+      if (tuple.dataMapExprWrapper == null) {
+        // Check for CG datamap
+        List<TableDataMap> allDataMapCG =
+            DataMapStoreManager.getInstance().getAllDataMap(carbonTable, DataMapType.CG);
+        tuple = selectDataMap(expression, allDataMapCG);
+      }
+      if (tuple.dataMapExprWrapper != null) {
+        return tuple.dataMapExprWrapper;
+      }
+    }
+    // Return the default datamap if no other datamap exists.
+    return new DataMapExprWrapperImpl(DataMapStoreManager.getInstance()
+        .getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()), resolverIntf);
+  }
+
+  private ExpressionTuple selectDataMap(Expression expression, List<TableDataMap> allDataMap) {
+    switch (expression.getFilterExpressionType()) {
+      case AND:
+        if (expression instanceof AndExpression) {
+          AndExpression andExpression = (AndExpression) expression;
+          ExpressionTuple left = selectDataMap(andExpression.getLeft(), allDataMap);
+          ExpressionTuple right = selectDataMap(andExpression.getRight(), allDataMap);
+          Set<ExpressionType> filterExpressionTypes = new HashSet<>();
+          // If both left and right has datamap then we can either merge both datamaps to single
+          // datamap if possible. Otherwise apply AND expression.
+          if (left.dataMapExprWrapper != null && right.dataMapExprWrapper != null) {
+            filterExpressionTypes.add(
+                left.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            filterExpressionTypes.add(
+                right.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            List<ColumnExpression> columnExpressions = new ArrayList<>();
+            columnExpressions.addAll(left.columnExpressions);
+            columnExpressions.addAll(right.columnExpressions);
+            // Check if we can merge them to single datamap.
+            TableDataMap dataMap =
+                chooseDataMap(allDataMap, columnExpressions, filterExpressionTypes);
+            if (dataMap != null) {
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper = new DataMapExprWrapperImpl(dataMap,
+                  new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            } else {
+              // Apply AND expression.
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper =
+                  new AndDataMapExprWrapper(left.dataMapExprWrapper, right.dataMapExprWrapper,
+                      new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            }
+          } else if (left.dataMapExprWrapper != null && right.dataMapExprWrapper == null) {
+            return left;
+          } else if (left.dataMapExprWrapper == null && right.dataMapExprWrapper != null) {
+            return right;
+          } else {
+            return left;
+          }
+        }
+        break;
+      case OR:
+        if (expression instanceof OrExpression) {
+          OrExpression orExpression = (OrExpression) expression;
+          ExpressionTuple left = selectDataMap(orExpression.getLeft(), allDataMap);
+          ExpressionTuple right = selectDataMap(orExpression.getRight(), allDataMap);
+          Set<ExpressionType> filterExpressionTypes = new HashSet<>();
+          // If both left and right has datamap then we can either merge both datamaps to single
+          // datamap if possible. Otherwise apply OR expression.
+          if (left.dataMapExprWrapper != null && right.dataMapExprWrapper != null) {
+            filterExpressionTypes.add(
+                left.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            filterExpressionTypes.add(
+                right.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            List<ColumnExpression> columnExpressions = new ArrayList<>();
+            columnExpressions.addAll(left.columnExpressions);
+            columnExpressions.addAll(right.columnExpressions);
+            TableDataMap dataMap =
+                chooseDataMap(allDataMap, columnExpressions, filterExpressionTypes);
+            if (dataMap != null) {
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper = new DataMapExprWrapperImpl(dataMap,
+                  new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            } else {
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper =
+                  new OrDataMapExprWrapper(left.dataMapExprWrapper, right.dataMapExprWrapper,
+                      new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            }
+          } else {
+            left.dataMapExprWrapper = null;
+            return left;
+          }
+        }
+        break;
+      default:
+        ExpressionTuple tuple = new ExpressionTuple();
+        extractColumnExpression(expression, tuple.columnExpressions);
+        Set<ExpressionType> filterExpressionTypes = new HashSet<>();
+        filterExpressionTypes.add(expression.getFilterExpressionType());
+        TableDataMap dataMap =
+            chooseDataMap(allDataMap, tuple.columnExpressions, filterExpressionTypes);
+        if (dataMap != null) {
+          tuple.dataMapExprWrapper = new DataMapExprWrapperImpl(dataMap,
+              new TrueConditionalResolverImpl(expression, false, false));
+        }
+        return tuple;
+    }
+    return new ExpressionTuple();
+  }
+
+  private void extractColumnExpression(Expression expression,
+      List<ColumnExpression> columnExpressions) {
+    if (expression instanceof ColumnExpression) {
+      columnExpressions.add((ColumnExpression) expression);
+    } else if (expression != null) {
+      List<Expression> children = expression.getChildren();
+      if (children != null && children.size() > 0) {
+        for (Expression exp : children) {
+          extractColumnExpression(exp, columnExpressions);
+        }
+      }
+    }
+  }
+
+  private TableDataMap chooseDataMap(List<TableDataMap> allDataMap,
+      List<ColumnExpression> columnExpressions, Set<ExpressionType> expressionTypes) {
+    List<DataMapTuple> tuples = new ArrayList<>();
+    for (TableDataMap dataMap : allDataMap) {
+      if (contains(dataMap.getDataMapFactory().getMeta(), columnExpressions, expressionTypes)) {
+        tuples.add(
+            new DataMapTuple(dataMap.getDataMapFactory().getMeta().getIndexedColumns().size(),
+                dataMap));
+      }
+    }
+    if (tuples.size() > 0) {
+      Collections.sort(tuples);
+      return tuples.get(0).dataMap;
+    }
+    return null;
+  }
+
+  private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions,
+      Set<ExpressionType> expressionTypes) {
+    if (mapMeta.getIndexedColumns().size() == 0 || columnExpressions.size() == 0) {
+      return false;
+    }
+    boolean contains = true;
+    for (ColumnExpression expression : columnExpressions) {
+      if (!mapMeta.getIndexedColumns().contains(expression.getColumnName()) || !mapMeta
+          .getOptimizedOperation().containsAll(expressionTypes)) {
+        contains = false;
+        break;
+      }
+    }
+    return contains;
+  }
+
+  private static class ExpressionTuple {
+
+    DataMapExprWrapper dataMapExprWrapper;
+
+    List<ColumnExpression> columnExpressions = new ArrayList<>();
+
+  }
+
+  private static class DataMapTuple implements Comparable<DataMapTuple> {
+
+    int order;
+
+    TableDataMap dataMap;
+
+    public DataMapTuple(int order, TableDataMap dataMap) {
+      this.order = order;
+      this.dataMap = dataMap;
+    }
+
+    @Override public int compareTo(DataMapTuple o) {
+      return order - o.order;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      DataMapTuple that = (DataMapTuple) o;
+
+      if (order != that.order) return false;
+      return dataMap != null ? dataMap.equals(that.dataMap) : that.dataMap == null;
+    }
+
+    @Override public int hashCode() {
+      int result = order;
+      result = 31 * result + (dataMap != null ? dataMap.hashCode() : 0);
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
index 50af789..828cdbb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 
@@ -33,11 +34,9 @@ public abstract class DataMapDistributable extends InputSplit
 
   private String segmentId;
 
-  private String dataMapName;
-
   private String[] locations;
 
-  private String dataMapFactoryClass;
+  private DataMapSchema dataMapSchema;
 
   public String getTablePath() {
     return tablePath;
@@ -55,20 +54,12 @@ public abstract class DataMapDistributable extends InputSplit
     this.segmentId = segmentId;
   }
 
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
-  public void setDataMapName(String dataMapName) {
-    this.dataMapName = dataMapName;
-  }
-
-  public String getDataMapFactoryClass() {
-    return dataMapFactoryClass;
+  public DataMapSchema getDataMapSchema() {
+    return dataMapSchema;
   }
 
-  public void setDataMapFactoryClass(String dataMapFactoryClass) {
-    this.dataMapFactoryClass = dataMapFactoryClass;
+  public void setDataMapSchema(DataMapSchema dataMapSchema) {
+    this.dataMapSchema = dataMapSchema;
   }
 
   public void setLocations(String[] locations) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 8d80b4d..ca5da1e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -22,14 +22,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
@@ -55,48 +59,76 @@ public final class DataMapStoreManager {
 
   }
 
-  public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
-    return allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+  /**
+   * It gives all datamaps of type @mapType except the default datamap.
+   *
+   */
+  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable, DataMapType mapType) {
+    List<TableDataMap> dataMaps = new ArrayList<>();
+    List<TableDataMap> tableDataMaps = getAllDataMap(carbonTable);
+    if (tableDataMaps != null) {
+      for (TableDataMap dataMap : tableDataMaps) {
+        if (mapType == dataMap.getDataMapFactory().getDataMapType()) {
+          dataMaps.add(dataMap);
+        }
+      }
+    }
+    return dataMaps;
   }
 
-  // TODO its a temporary method till chooser is implemented
-  public TableDataMap chooseDataMap(AbsoluteTableIdentifier identifier) {
-    List<TableDataMap> tableDataMaps = getAllDataMap(identifier);
-    if (tableDataMaps != null && tableDataMaps.size() > 0) {
-      for (TableDataMap dataMap: tableDataMaps) {
-        if (!dataMap.getDataMapName().equalsIgnoreCase(BlockletDataMap.NAME)) {
-          return dataMap;
+  /**
+   * It gives all datamaps except the default datamap.
+   *
+   * @return
+   */
+  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable) {
+    List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+    List<TableDataMap> dataMaps = new ArrayList<>();
+    if (dataMapSchemaList != null) {
+      for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+        if (!dataMapSchema.getClassName()
+            .equalsIgnoreCase(CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA)) {
+          dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema));
         }
       }
-      return tableDataMaps.get(0);
-    } else {
-      return getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
     }
+    return dataMaps;
   }
 
   /**
-   * Get the datamap for reading data.
+   * It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
    *
-   * @param dataMapName
-   * @param factoryClass
+   * @param identifier
    * @return
    */
-  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
-      String dataMapName, String factoryClass) {
+  public TableDataMap getDefaultDataMap(AbsoluteTableIdentifier identifier) {
+    return getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
+  }
+
+  /**
+   * Get the datamap for reading data.
+   */
+  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
     String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
-    TableDataMap dataMap;
-    if (tableDataMaps == null) {
+    TableDataMap dataMap = null;
+    if (tableDataMaps != null) {
+      dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableDataMaps);
+    }
+    if (dataMap == null) {
       synchronized (table.intern()) {
         tableDataMaps = allDataMaps.get(table);
-        if (tableDataMaps == null) {
-          dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
-        } else {
-          dataMap = getTableDataMap(dataMapName, tableDataMaps);
+        if (tableDataMaps != null) {
+          dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableDataMaps);
+        }
+        if (dataMap == null) {
+          try {
+            dataMap = createAndRegisterDataMap(identifier, dataMapSchema);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
         }
       }
-    } else {
-      dataMap = getTableDataMap(dataMapName, tableDataMaps);
     }
 
     if (dataMap == null) {
@@ -110,7 +142,8 @@ public final class DataMapStoreManager {
    * The datamap is created using datamap name, datamap factory class and table identifier.
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
-      String factoryClassName, String dataMapName) {
+      DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
     String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);
@@ -118,16 +151,20 @@ public final class DataMapStoreManager {
     if (tableDataMaps == null) {
       tableDataMaps = new ArrayList<>();
     }
+    String dataMapName = dataMapSchema.getDataMapName();
     TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap != null && dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) {
-      throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
+    if (dataMap != null && dataMap.getDataMapSchema().getDataMapName()
+        .equalsIgnoreCase(dataMapName)) {
+      throw new MalformedDataMapCommandException("Already datamap exists in that path with type " +
+          dataMapName);
     }
 
     try {
+      // try to create datamap by reflection to test whether it is a valid DataMapFactory class
       Class<? extends DataMapFactory> factoryClass =
-          (Class<? extends DataMapFactory>) Class.forName(factoryClassName);
+          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getClassName());
       DataMapFactory dataMapFactory = factoryClass.newInstance();
-      dataMapFactory.init(identifier, dataMapName);
+      dataMapFactory.init(identifier, dataMapSchema);
       BlockletDetailsFetcher blockletDetailsFetcher;
       SegmentPropertiesFetcher segmentPropertiesFetcher = null;
       if (dataMapFactory instanceof BlockletDetailsFetcher) {
@@ -136,11 +173,14 @@ public final class DataMapStoreManager {
         blockletDetailsFetcher = getBlockletDetailsFetcher(identifier);
       }
       segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
-      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher,
+      dataMap = new TableDataMap(identifier, dataMapSchema, dataMapFactory, blockletDetailsFetcher,
           segmentPropertiesFetcher);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new MalformedDataMapCommandException("DataMap class '" +
+          dataMapSchema.getClassName() + "' not found");
+    } catch (Throwable e) {
+      throw new MetadataProcessException(
+          "failed to create DataMap instance for '" + dataMapSchema.getClassName() + "'", e);
     }
     tableDataMaps.add(dataMap);
     allDataMaps.put(table, tableDataMaps);
@@ -150,8 +190,7 @@ public final class DataMapStoreManager {
   private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tableDataMaps) {
     TableDataMap dataMap = null;
     for (TableDataMap tableDataMap : tableDataMaps) {
-      if (tableDataMap.getDataMapName().equals(dataMapName) || (!tableDataMap.getDataMapName()
-          .equals(""))) {
+      if (tableDataMap.getDataMapSchema().getDataMapName().equals(dataMapName)) {
         dataMap = tableDataMap;
         break;
       }
@@ -160,16 +199,30 @@ public final class DataMapStoreManager {
   }
 
   /**
+   * Clear the invalid segments from all the datamaps of the table
+   * @param carbonTable
+   * @param segments
+   */
+  public void clearInvalidSegments(CarbonTable carbonTable, List<String> segments) {
+    getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()).clear(segments);
+    List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
+    for (TableDataMap dataMap: allDataMap) {
+      dataMap.clear(segments);
+    }
+
+  }
+
+  /**
    * Clear the datamap/datamaps of a table from memory
+   *
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
     String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
-    List<TableDataMap> tableDataMaps =
-        allDataMaps.get(tableUniqueName);
+    List<TableDataMap> tableDataMaps = allDataMaps.get(tableUniqueName);
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
-      for (TableDataMap tableDataMap: tableDataMaps) {
+      for (TableDataMap tableDataMap : tableDataMaps) {
         if (tableDataMap != null) {
           tableDataMap.clear();
           break;
@@ -181,6 +234,7 @@ public final class DataMapStoreManager {
 
   /**
    * Clear the datamap/datamaps of a table from memory
+   *
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -188,8 +242,9 @@ public final class DataMapStoreManager {
         allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
     if (tableDataMaps != null) {
       int i = 0;
-      for (TableDataMap tableDataMap: tableDataMaps) {
-        if (tableDataMap != null && dataMapName.equalsIgnoreCase(tableDataMap.getDataMapName())) {
+      for (TableDataMap tableDataMap : tableDataMaps) {
+        if (tableDataMap != null && dataMapName
+            .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
           tableDataMap.clear();
           tableDataMaps.remove(i);
           break;
@@ -201,17 +256,18 @@ public final class DataMapStoreManager {
 
   /**
    * Get the blocklet datamap factory to get the detail information of blocklets
+   *
    * @param identifier
    * @return
    */
   private BlockletDetailsFetcher getBlockletDetailsFetcher(AbsoluteTableIdentifier identifier) {
-    TableDataMap blockletMap =
-        getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    TableDataMap blockletMap = getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
     return (BlockletDetailsFetcher) blockletMap.getDataMapFactory();
   }
 
   /**
    * Returns the singleton instance
+   *
    * @return
    */
   public static DataMapStoreManager getInstance() {
@@ -271,7 +327,7 @@ public final class DataMapStoreManager {
     }
 
     public void refreshSegments(List<String> segmentIds) {
-      for (String segmentId: segmentIds) {
+      for (String segmentId : segmentIds) {
         manualSegmentRefresh.put(segmentId, true);
       }
     }
@@ -286,5 +342,4 @@ public final class DataMapStoreManager {
     }
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 42fc702..d4a8a22 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
@@ -45,7 +46,7 @@ public final class TableDataMap extends OperationEventListener {
 
   private AbsoluteTableIdentifier identifier;
 
-  private String dataMapName;
+  private DataMapSchema dataMapSchema;
 
   private DataMapFactory dataMapFactory;
 
@@ -56,11 +57,11 @@ public final class TableDataMap extends OperationEventListener {
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
-  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+  public TableDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema,
       DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher,
       SegmentPropertiesFetcher segmentPropertiesFetcher) {
     this.identifier = identifier;
-    this.dataMapName = dataMapName;
+    this.dataMapSchema = dataMapSchema;
     this.dataMapFactory = dataMapFactory;
     this.blockletDetailsFetcher = blockletDetailsFetcher;
     this.segmentPropertiesFetcher = segmentPropertiesFetcher;
@@ -115,10 +116,9 @@ public final class TableDataMap extends OperationEventListener {
     for (String segmentsId : segmentIds) {
       List<DataMapDistributable> list = dataMapFactory.toDistributable(segmentsId);
       for (DataMapDistributable distributable: list) {
-        distributable.setDataMapName(dataMapName);
+        distributable.setDataMapSchema(dataMapSchema);
         distributable.setSegmentId(segmentsId);
         distributable.setTablePath(identifier.getTablePath());
-        distributable.setDataMapFactoryClass(dataMapFactory.getClass().getName());
       }
       distributables.addAll(list);
     }
@@ -147,7 +147,8 @@ public final class TableDataMap extends OperationEventListener {
     }
     BlockletSerializer serializer = new BlockletSerializer();
     String writePath =
-        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
+        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema
+            .getDataMapName();
     if (dataMapFactory.getDataMapType() == DataMapType.FG) {
       FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
     }
@@ -182,13 +183,9 @@ public final class TableDataMap extends OperationEventListener {
   public void clear() {
     dataMapFactory.clear();
   }
-  /**
-   * Get the unique name of datamap
-   *
-   * @return
-   */
-  public String getDataMapName() {
-    return dataMapName;
+
+  public DataMapSchema getDataMapSchema() {
+    return dataMapSchema;
   }
 
   public DataMapFactory getDataMapFactory() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index e900f8a..0aebe9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.events.Event;
 
 /**
@@ -33,7 +34,7 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema);
 
   /**
    * Return a new write for this datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
new file mode 100644
index 0000000..458772f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * And expression for datamaps
+ */
+public class AndDataMapExprWrapper implements DataMapExprWrapper {
+
+  private DataMapExprWrapper left;
+
+  private DataMapExprWrapper right;
+
+  private FilterResolverIntf resolverIntf;
+
+  public AndDataMapExprWrapper(DataMapExprWrapper left, DataMapExprWrapper right,
+      FilterResolverIntf resolverIntf) {
+    this.left = left;
+    this.right = right;
+    this.resolverIntf = resolverIntf;
+  }
+
+  @Override public List<ExtendedBlocklet> prune(List<String> segments,
+      List<String> partitionsToPrune) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
+    for (ExtendedBlocklet blocklet : leftPrune) {
+      if (rightPrune.contains(blocklet)) {
+        andBlocklets.add(blocklet);
+      }
+    }
+    return andBlocklets;
+  }
+
+  @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
+      throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
+    List<ExtendedBlocklet> rightPrune = right.pruneBlocklets(blocklets);
+    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
+    for (ExtendedBlocklet blocklet : leftPrune) {
+      if (rightPrune.contains(blocklet)) {
+        andBlocklets.add(blocklet);
+      }
+    }
+    return andBlocklets;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf() {
+    return resolverIntf;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf(String uniqueId) {
+    FilterResolverIntf leftExp = left.getFilterResolverIntf(uniqueId);
+    FilterResolverIntf rightExp = right.getFilterResolverIntf(uniqueId);
+    if (leftExp != null) {
+      return leftExp;
+    } else if (rightExp != null) {
+      return rightExp;
+    }
+    return null;
+  }
+
+  @Override public List<DataMapDistributableWrapper> toDistributable(List<String> segments)
+      throws IOException {
+    List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
+    wrappers.addAll(left.toDistributable(segments));
+    wrappers.addAll(right.toDistributable(segments));
+    return wrappers;
+  }
+
+  @Override public DataMapType getDataMapType() {
+    return left.getDataMapType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java
new file mode 100644
index 0000000..9075032
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class DataMapDistributableWrapper extends InputSplit implements Serializable {
+
+  private String uniqueId;
+
+  private DataMapDistributable distributable;
+
+  public DataMapDistributableWrapper(String uniqueId, DataMapDistributable distributable) {
+    this.uniqueId = uniqueId;
+    this.distributable = distributable;
+  }
+
+  public String getUniqueId() {
+    return uniqueId;
+  }
+
+  public DataMapDistributable getDistributable() {
+    return distributable;
+  }
+
+  public void setDistributable(DataMapDistributable distributable) {
+    this.distributable = distributable;
+  }
+
+  @Override public long getLength() throws IOException, InterruptedException {
+    return distributable.getLength();
+  }
+
+  @Override public String[] getLocations() throws IOException, InterruptedException {
+    return distributable.getLocations();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
new file mode 100644
index 0000000..36c2472
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * It is the wrapper around datamap and related filter expression. By using it user can apply
+ * datamaps in expression style.
+ */
+public interface DataMapExprWrapper extends Serializable {
+
+  /**
+   * It get the blocklets from each leaf node datamap and apply expressions on the blocklets
+   * using list of segments, it is used in case on non distributable datamap.
+   */
+  List<ExtendedBlocklet> prune(List<String> segments, List<String> partitionsToPrune)
+      throws IOException;
+
+  /**
+   * It is used in case on distributable datamap. First using job it gets all blockets from all
+   * related datamaps. These blocklets are passed to this method to apply expression.
+   * @param blocklets
+   * @return
+   * @throws IOException
+   */
+  List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets) throws IOException;
+
+  /**
+   * Get the underlying filter expression.
+   * @return
+   */
+  FilterResolverIntf getFilterResolverIntf();
+
+  /**
+   * Convert to distributable objects for executing job.
+   * @param segments
+   * @return
+   * @throws IOException
+   */
+  List<DataMapDistributableWrapper> toDistributable(List<String> segments) throws IOException;
+
+  /**
+   * Each leaf node is identified by uniqueid, so if user wants the underlying filter expression for
+   * any leaf node then this method can be used.
+   * @param uniqueId
+   * @return
+   */
+  FilterResolverIntf getFilterResolverIntf(String uniqueId);
+
+  /**
+   * Get the datamap type.
+   * @return
+   */
+  DataMapType getDataMapType();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
new file mode 100644
index 0000000..a66d31b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+public class DataMapExprWrapperImpl implements DataMapExprWrapper {
+
+  private static final long serialVersionUID = -6240385328696074171L;
+
+  private transient TableDataMap dataMap;
+
+  private FilterResolverIntf expression;
+
+  private String uniqueId;
+
+  public DataMapExprWrapperImpl(TableDataMap dataMap, FilterResolverIntf expression) {
+    this.dataMap = dataMap;
+    this.expression = expression;
+    this.uniqueId = UUID.randomUUID().toString();
+  }
+
+  @Override public List<ExtendedBlocklet> prune(List<String> segments,
+      List<String> partitionsToPrune) throws IOException {
+    return dataMap.prune(segments, expression, partitionsToPrune);
+  }
+
+  @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
+      throws IOException {
+    List<ExtendedBlocklet> blockletList = new ArrayList<>();
+    for (ExtendedBlocklet blocklet: blocklets) {
+      if (blocklet.getDataMapUniqueId().equals(uniqueId)) {
+        blockletList.add(blocklet);
+      }
+    }
+    return blockletList;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf() {
+    return expression;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf(String uniqueId) {
+    if (this.uniqueId.equals(uniqueId)) {
+      return expression;
+    }
+    return null;
+  }
+
+  @Override public List<DataMapDistributableWrapper> toDistributable(List<String> segments)
+      throws IOException {
+    List<DataMapDistributable> dataMapDistributables = dataMap.toDistributable(segments);
+    List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
+    for (DataMapDistributable distributable : dataMapDistributables) {
+      wrappers.add(new DataMapDistributableWrapper(uniqueId, distributable));
+    }
+    return wrappers;
+  }
+
+  @Override public DataMapType getDataMapType() {
+    return dataMap.getDataMapFactory().getDataMapType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
new file mode 100644
index 0000000..1d90f0d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Or expression for datamaps
+ */
+public class OrDataMapExprWrapper implements DataMapExprWrapper {
+
+  private DataMapExprWrapper left;
+
+  private DataMapExprWrapper right;
+
+  private FilterResolverIntf resolverIntf;
+
+  public OrDataMapExprWrapper(DataMapExprWrapper left, DataMapExprWrapper right,
+      FilterResolverIntf resolverIntf) {
+    this.left = left;
+    this.right = right;
+    this.resolverIntf = resolverIntf;
+  }
+
+  @Override public List<ExtendedBlocklet> prune(List<String> segments,
+      List<String> partitionsToPrune) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+    Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
+    andBlocklets.addAll(leftPrune);
+    andBlocklets.addAll(rightPrune);
+    return new ArrayList<>(andBlocklets);
+  }
+
+  @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
+      throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
+    List<ExtendedBlocklet> rightPrune = right.pruneBlocklets(blocklets);
+    Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
+    andBlocklets.addAll(leftPrune);
+    andBlocklets.addAll(rightPrune);
+    return new ArrayList<>(andBlocklets);
+  }
+
+  @Override public List<DataMapDistributableWrapper> toDistributable(List<String> segments)
+      throws IOException {
+    List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
+    wrappers.addAll(left.toDistributable(segments));
+    wrappers.addAll(right.toDistributable(segments));
+    return wrappers;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf() {
+    return resolverIntf;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf(String uniqueId) {
+    FilterResolverIntf leftExp = left.getFilterResolverIntf(uniqueId);
+    FilterResolverIntf rightExp = right.getFilterResolverIntf(uniqueId);
+    if (leftExp != null) {
+      return leftExp;
+    } else if (rightExp != null) {
+      return rightExp;
+    }
+    return null;
+  }
+
+
+  @Override public DataMapType getDataMapType() {
+    return left.getDataMapType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index eb36c8d..93da81e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -38,7 +39,14 @@ public class TableSpec {
   // number of simple dimensions
   private int numSimpleDimensions;
 
-  public TableSpec(List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+  private CarbonTable carbonTable;
+
+  public TableSpec(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     // first calculate total number of columnar field considering column group and complex column
     numSimpleDimensions = 0;
     for (CarbonDimension dimension : dimensions) {
@@ -112,6 +120,10 @@ public class TableSpec {
     return measureSpec.length;
   }
 
+  public CarbonTable getCarbonTable() {
+    return carbonTable;
+  }
+
   public static class ColumnSpec implements Writable {
     // field name of this column
     private String fieldName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index c731e07..052d269 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -58,4 +58,24 @@ public class Blocklet implements Writable,Serializable {
     blockId = in.readUTF();
     blockletId = in.readUTF();
   }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    Blocklet blocklet = (Blocklet) o;
+
+    if (blockId != null ? !blockId.equals(blocklet.blockId) : blocklet.blockId != null) {
+      return false;
+    }
+    return blockletId != null ?
+        blockletId.equals(blocklet.blockletId) :
+        blocklet.blockletId == null;
+  }
+
+  @Override public int hashCode() {
+    int result = blockId != null ? blockId.hashCode() : 0;
+    result = 31 * result + (blockletId != null ? blockletId.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 58a9344..d2af5cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -29,13 +29,12 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String[] location;
 
-  private String path;
-
   private String dataMapWriterPath;
 
+  private String dataMapUniqueId;
+
   public ExtendedBlocklet(String path, String blockletId) {
     super(path, blockletId);
-    this.path = path;
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -67,7 +66,7 @@ public class ExtendedBlocklet extends Blocklet {
   }
 
   public String getPath() {
-    return path;
+    return getBlockId();
   }
 
   public String getDataMapWriterPath() {
@@ -77,4 +76,30 @@ public class ExtendedBlocklet extends Blocklet {
   public void setDataMapWriterPath(String dataMapWriterPath) {
     this.dataMapWriterPath = dataMapWriterPath;
   }
+
+  public String getDataMapUniqueId() {
+    return dataMapUniqueId;
+  }
+
+  public void setDataMapUniqueId(String dataMapUniqueId) {
+    this.dataMapUniqueId = dataMapUniqueId;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) {
+      return false;
+    }
+
+    ExtendedBlocklet that = (ExtendedBlocklet) o;
+
+    return segmentId != null ? segmentId.equals(that.segmentId) : that.segmentId == null;
+  }
+
+  @Override public int hashCode() {
+    int result = super.hashCode();
+    result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
index 266120e..229e5bf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
@@ -117,4 +117,12 @@ public class FineGrainBlocklet extends Blocklet implements Serializable {
       pages.add(page);
     }
   }
+
+  @Override public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 99a47ff..aceb372 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -78,8 +78,6 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
 
-  public static final String NAME = "clustered.btree.blocklet";
-
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index f6b8165..664242b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -55,6 +56,11 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
     implements BlockletDetailsFetcher,
     SegmentPropertiesFetcher {
 
+  private static final String NAME = "clustered.btree.blocklet";
+
+  public static final DataMapSchema DATA_MAP_SCHEMA =
+      new DataMapSchema(NAME, BlockletDataMapFactory.class.getName());
+
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
@@ -63,7 +69,7 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
   private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainDataMap> cache;
 
   @Override
-  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
     this.identifier = identifier;
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
new file mode 100644
index 0000000..18c7374
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.expression.conditional;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+public class StartsWithExpression extends BinaryConditionalExpression {
+  private static final long serialVersionUID = -5319109756575539219L;
+
+  public StartsWithExpression(Expression left, Expression right) {
+    super(left, right);
+  }
+
+  @Override public ExpressionResult evaluate(RowIntf value)
+      throws FilterUnsupportedException, FilterIllegalMemberException {
+    ExpressionResult exprLeftRes = left.evaluate(value);
+    ExpressionResult exprRightRes = right.evaluate(value);
+    ExpressionResult val1 = exprLeftRes;
+    if (exprLeftRes.isNull() || exprRightRes.isNull()) {
+      exprLeftRes.set(DataTypes.BOOLEAN, false);
+      return exprLeftRes;
+    }
+    if (exprLeftRes.getDataType() != exprRightRes.getDataType()) {
+      if (exprLeftRes.getDataType().getPrecedenceOrder() < exprRightRes.getDataType()
+          .getPrecedenceOrder()) {
+        val1 = exprRightRes;
+      }
+
+    }
+    boolean result = false;
+    DataType dataType = val1.getDataType();
+    if (dataType == DataTypes.STRING) {
+      result = exprLeftRes.getString().startsWith(exprRightRes.getString());
+    } else {
+      throw new FilterUnsupportedException(
+          "DataType: " + val1.getDataType() + " not supported for the filter expression");
+    }
+    val1.set(DataTypes.BOOLEAN, result);
+    return val1;
+  }
+
+  @Override public ExpressionType getFilterExpressionType() {
+    return ExpressionType.STARTSWITH;
+  }
+
+  @Override public String getString() {
+    return "StartsWith(" + left.getString() + ',' + right.getString() + ')';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 72ca1a4..b96603a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -46,7 +46,9 @@ import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
 import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.expression.conditional.StartsWithExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
@@ -369,7 +371,23 @@ public class FilterExpressionProcessor implements FilterProcessor {
       case LESSTHAN_EQUALTO:
         return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS, true, expressionTree,
             tableIdentifier, expressionTree);
-
+      case STARTSWITH:
+        assert (expressionTree instanceof StartsWithExpression);
+        currentExpression = (StartsWithExpression) expressionTree;
+        Expression re = currentExpression.getRight();
+        assert (re instanceof LiteralExpression);
+        LiteralExpression literal = (LiteralExpression) re;
+        String value = literal.getLiteralExpValue().toString();
+        Expression left = new GreaterThanEqualToExpression(currentExpression.getLeft(), literal);
+        String maxValueLimit = value.substring(0, value.length() - 1) + (char) (
+            ((int) value.charAt(value.length() - 1)) + 1);
+        Expression right = new LessThanExpression(currentExpression.getLeft(),
+            new LiteralExpression(maxValueLimit, literal.getLiteralExpDataType()));
+        currentExpression = new AndExpression(left, right);
+        return new LogicalFilterResolverImpl(
+            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
+            createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
+            currentExpression);
       case NOT_EQUALS:
       case NOT_IN:
         return getFilterResolverBasedOnExpressionType(ExpressionType.NOT_EQUALS, false,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
index a3f9199..831acc8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -39,6 +39,8 @@ public enum ExpressionType {
   LITERAL,
   RANGE,
   FALSE,
-  TRUE
-
+  TRUE,
+  STARTSWITH,
+  ENDSWITH,
+  CONTAINSWITH
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 2e73aef..439df0d 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -360,9 +360,8 @@ public class SegmentStatusManager {
         LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
 
         if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
-          // log error.
-          LOG.error("Error message: " + "Load metadata file is not present.");
-          invalidLoadTimestamps.add(loadDate);
+          // Table status file is not present, maybe table is empty, ignore this operation
+          LOG.warn("Trying to update table metadata file which is not present.");
           return invalidLoadTimestamps;
         }
         // read existing metadata details in load metadata.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index 5203cb3..925731a 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataM
 import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.events.Event;
 
@@ -41,7 +42,7 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
-  @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
     this.identifier = identifier;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ad5a1c9..04902f9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,14 +33,15 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -338,9 +339,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
-    TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
-            BlockletDataMapFactory.class.getName());
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> streamSegments = null;
@@ -368,7 +366,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
       }
       if (invalidSegments.size() > 0) {
-        blockletMap.clear(invalidSegments);
+        DataMapStoreManager.getInstance()
+            .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), invalidSegments);
       }
     }
 
@@ -392,7 +391,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         toBeCleanedSegments.add(segment);
       }
     }
-    blockletMap.clear(toBeCleanedSegments);
+    if (toBeCleanedSegments.size() > 0) {
+      DataMapStoreManager.getInstance()
+          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+              toBeCleanedSegments);
+    }
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
@@ -720,19 +723,21 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
             CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
-    TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().chooseDataMap(absoluteTableIdentifier);
+    DataMapExprWrapper dataMapExprWrapper =
+        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (distributedCG || blockletMap.getDataMapFactory().getDataMapType() == DataMapType.FG) {
+    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapType.FG) {
       DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, blockletMap.getDataMapName(),
+          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
               segmentIds, partitionsToPrune,
               BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+      // Apply expression on the blocklets.
+      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     } else {
-      prunedBlocklets = blockletMap.prune(segmentIds, resolver, partitionsToPrune);
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
     }
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -897,8 +902,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier,
       List<String> partitions) throws IOException {
-    TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(identifier);
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
         new SegmentStatusManager(identifier).getValidAndInvalidSegments();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index fad2336..64936aa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -29,6 +29,6 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 public interface DataMapJob extends Serializable {
 
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
-      FilterResolverIntf resolverIntf);
+      FilterResolverIntf filter);
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 96eec6f..596f013 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -22,9 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -47,7 +48,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private AbsoluteTableIdentifier identifier;
 
-  private String dataMapName;
+  private DataMapExprWrapper dataMapExprWrapper;
 
   private List<String> validSegments;
 
@@ -55,10 +56,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private List<String> partitions;
 
-  public DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
-      String dataMapName, List<String> validSegments, List<String> partitions, String className) {
+  DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
+      DataMapExprWrapper dataMapExprWrapper, List<String> validSegments, List<String> partitions,
+      String className) {
     this.identifier = identifier;
-    this.dataMapName = dataMapName;
+    this.dataMapExprWrapper = dataMapExprWrapper;
     this.validSegments = validSegments;
     this.className = className;
     this.partitions = partitions;
@@ -83,9 +85,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
-    TableDataMap dataMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, dataMapName, className);
-    List<DataMapDistributable> distributables = dataMap.toDistributable(validSegments);
+    List<DataMapDistributableWrapper> distributables =
+        dataMapExprWrapper.toDistributable(validSegments);
     List<InputSplit> inputSplits = new ArrayList<>(distributables.size());
     inputSplits.addAll(distributables);
     return inputSplits;
@@ -101,13 +102,16 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
       @Override
       public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
-        DataMapDistributable distributable = (DataMapDistributable)inputSplit;
+        DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
         TableDataMap dataMap = DataMapStoreManager.getInstance()
-            .getDataMap(identifier, distributable.getDataMapName(),
-                distributable.getDataMapFactoryClass());
-        blockletIterator = dataMap.prune(
-            distributable, getFilterExp(taskAttemptContext.getConfiguration()), partitions)
-            .iterator();
+            .getDataMap(identifier, distributable.getDistributable().getDataMapSchema());
+        List<ExtendedBlocklet> blocklets = dataMap.prune(
+            distributable.getDistributable(),
+            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
+        for (ExtendedBlocklet blocklet: blocklets) {
+          blocklet.setDataMapUniqueId(distributable.getUniqueId());
+        }
+        blockletIterator = blocklets.iterator();
       }
 
       @Override