You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/04/18 07:59:26 UTC

[1/2] carbondata git commit: [CARBONDATA-2347][LUCENE_DATAMAP]load issue in lucene datamap, make multiple directory based on taskId

Repository: carbondata
Updated Branches:
  refs/heads/master ceac8abf6 -> 860e144d4


http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index be5287f..f64299c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapProvider
+import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -69,6 +69,29 @@ case class CarbonCreateDataMapCommand(
     }
 
     dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+    // TODO: move this if logic inside lucene module
+    if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.toString)) {
+      val datamaps = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala
+      if (datamaps.nonEmpty) {
+        datamaps.foreach(datamap => {
+          val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns")
+          val existingColumns = dmProperties("text_columns")
+
+          def getAllSubString(columns: String): Set[String] = {
+            columns.inits.flatMap(_.tails).toSet
+          }
+
+          val existingClmSets = getAllSubString(existingColumns)
+          val dmColumnsSets = getAllSubString(dmColumns)
+          val duplicateDMColumn = existingClmSets.intersect(dmColumnsSets).maxBy(_.length)
+          if (!duplicateDMColumn.isEmpty) {
+            throw new MalformedDataMapCommandException(
+              s"Create lucene datamap $dataMapName failed, datamap already exists on column(s) " +
+              s"$duplicateDMColumn")
+          }
+        })
+      }
+    }
     if (mainTable != null &&
         mainTable.isStreamingTable &&
         !(dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 21aba7d..613c8b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -28,10 +28,12 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.StringType
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 
 /**
  * Show the datamaps on the table
+ *
  * @param tableIdentifier
  */
 case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
@@ -44,20 +46,22 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val dataMapSchemaList: util.List[DataMapSchema] = new util.ArrayList[DataMapSchema]()
     tableIdentifier match {
       case Some(table) =>
         Checker.validateTableExists(table.database, table.table, sparkSession)
         val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession)
         if (carbonTable.hasDataMapSchema) {
-          val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
-          convertToRow(schemaList)
-        } else {
-          convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable))
+          dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList)
+        }
+        val indexSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
+        if (!indexSchemas.isEmpty) {
+          dataMapSchemaList.addAll(indexSchemas)
         }
+        convertToRow(dataMapSchemaList)
       case _ =>
         convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas)
     }
-
   }
 
   private def convertToRow(schemaList: util.List[DataMapSchema]) = {
@@ -65,9 +69,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
       schemaList.asScala.map { s =>
         var table = "(NA)"
         val relationIdentifier = s.getRelationIdentifier
-        if (relationIdentifier != null) {
           table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName
-        }
         Row(s.getDataMapName, s.getProviderName, table)
       }
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index ca9a6a1..1b087bd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.mutation
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 458bc8d..07cdf7c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -110,9 +110,10 @@ case class CarbonDropTableCommand(
             dropCommand
           }
         childDropCommands.foreach(_.processMetadata(sparkSession))
-      } else {
-        val schemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
-        childDropDataMapCommands = schemas.asScala.map{ schema =>
+      }
+      val indexDatamapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
+      if (!indexDatamapSchemas.isEmpty) {
+        childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
           val command = CarbonDropDataMapCommand(schema.getDataMapName,
             ifExistsSet,
             Some(TableIdentifier(tableName, Some(dbName))),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ad3ad2e..c58d02d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
-import org.apache.spark.sql.execution.command.datamap.CarbonDataMapRefreshCommand
 import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
@@ -37,7 +36,6 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.merger.CompactionType
 
 /**
  * Carbon strategies for ddl commands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 2e39c91..14950eb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -85,10 +85,10 @@ public class DataMapWriterListener {
     LOG.info("DataMapWriter " + writer + " added");
   }
 
-  public void onBlockStart(String blockId, String blockPath) throws IOException {
+  public void onBlockStart(String blockId, String blockPath, long taskId) throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId);
+        writer.onBlockStart(blockId, taskId);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 5783fe5..94ade87 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -250,7 +250,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   private void notifyDataMapBlockStart() {
     if (listener != null) {
       try {
-        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath);
+        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath,
+            model.getCarbonDataFileAttributes().getTaskId());
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing datamap", e);
       }


[2/2] carbondata git commit: [CARBONDATA-2347][LUCENE_DATAMAP]load issue in lucene datamap, make multiple directory based on taskId

Posted by qi...@apache.org.
[CARBONDATA-2347][LUCENE_DATAMAP]load issue in lucene datamap, make multiple directory based on taskId

make the datamap distributable object based on lucene index path written during load
Added Lucene Listener and Fixed Show Datamap

This closes #2113


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/860e144d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/860e144d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/860e144d

Branch: refs/heads/master
Commit: 860e144d4ac87a5d19a4572b2ffa7d2f192e4162
Parents: ceac8ab
Author: akashrn5 <ak...@gmail.com>
Authored: Thu Mar 29 19:59:36 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Wed Apr 18 15:56:44 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  13 ++
 .../core/datamap/DataMapStoreManager.java       |   8 +-
 .../core/datamap/IndexDataMapProvider.java      |  34 ++--
 .../carbondata/core/datamap/TableDataMap.java   |   7 +
 .../core/datamap/dev/DataMapFactory.java        |  11 +-
 .../core/datamap/dev/DataMapWriter.java         |   2 +-
 .../blockletindex/BlockletDataMapFactory.java   |   9 +-
 .../core/metadata/SegmentFileStore.java         |  10 +-
 .../schema/datamap/DataMapClassProvider.java    |  17 +-
 .../metadata/schema/table/DataMapSchema.java    |   1 -
 .../table/DataMapSchemaStorageProvider.java     |   3 +-
 .../table/DiskBasedDMSchemaStorageProvider.java |  27 +++-
 .../datamap/examples/MinMaxDataWriter.java      |   2 +-
 .../examples/MinMaxIndexDataMapFactory.java     |  14 +-
 .../lucene/LuceneCoarseGrainDataMapFactory.java |   4 +
 .../lucene/LuceneDataMapDistributable.java      |  13 +-
 .../lucene/LuceneDataMapFactoryBase.java        |  93 ++++++++---
 .../datamap/lucene/LuceneDataMapWriter.java     |  80 +++++++--
 .../datamap/lucene/LuceneFineGrainDataMap.java  |  20 ++-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  12 +-
 .../lucene/LuceneCoarseGrainDataMapSuite.scala  |  70 --------
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 148 -----------------
 .../lucene/LuceneCoarseGrainDataMapSuite.scala  |  74 +++++++++
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 161 +++++++++++++++++++
 .../testsuite/datamap/CGDataMapTestCase.scala   |  32 ++--
 .../testsuite/datamap/DataMapWriterSuite.scala  |  13 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  15 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  15 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  14 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  25 ++-
 .../datamap/CarbonDataMapShowCommand.scala      |  16 +-
 .../CarbonProjectForDeleteCommand.scala         |   1 -
 .../command/table/CarbonDropTableCommand.scala  |   7 +-
 .../sql/execution/strategy/DDLStrategy.scala    |   2 -
 .../datamap/DataMapWriterListener.java          |   4 +-
 .../store/writer/AbstractFactDataWriter.java    |   3 +-
 36 files changed, 631 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index df995e0..6ab1ce5 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1653,6 +1653,19 @@ public final class CarbonCommonConstants {
       "carbon.compaction.prefetch.enable";
   public static final String CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT = "false";
 
+  /**
+   * compression mode used by lucene for index writing, this conf will be passed to lucene writer
+   * while writing index files.
+   */
+  public static final String CARBON_LUCENE_COMPRESSION_MODE = "carbon.lucene.compression.mode";
+
+  /**
+   * default lucene index compression mode, in this mode writing speed will be less and speed is
+   * given priority, another mode is compression mode, where the index size is given importance to
+   * make it less and not the index writing speed.
+   */
+  public static final String CARBON_LUCENE_COMPRESSION_MODE_DEFAULT = "speed";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 02cf2a5..169cbde 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -252,8 +252,9 @@ public final class DataMapStoreManager {
           (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
       dataMapFactory = factoryClass.newInstance();
     } catch (ClassNotFoundException e) {
-      throw new MalformedDataMapCommandException(
-          "DataMap '" + dataMapSchema.getProviderName() + "' not found");
+      // try to create DataMapClassProvider instance by taking providerName as short name
+      dataMapFactory =
+          IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName());
     } catch (Throwable e) {
       throw new MetadataProcessException(
           "failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e);
@@ -272,7 +273,7 @@ public final class DataMapStoreManager {
       tableIndices = new ArrayList<>();
     }
 
-    dataMapFactory.init(table.getAbsoluteTableIdentifier(), dataMapSchema);
+    dataMapFactory.init(table, dataMapSchema);
     BlockletDetailsFetcher blockletDetailsFetcher;
     SegmentPropertiesFetcher segmentPropertiesFetcher = null;
     if (dataMapFactory instanceof BlockletDetailsFetcher) {
@@ -348,6 +349,7 @@ public final class DataMapStoreManager {
         if (tableDataMap != null && dataMapName
             .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
           tableDataMap.clear();
+          tableDataMap.deleteDatamapData();
           tableIndices.remove(i);
           break;
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
index 02ff1a1..85a4341 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.MetadataProcessException;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -46,10 +47,12 @@ public class IndexDataMapProvider implements DataMapProvider {
           "Parent table is required to create index datamap");
     }
     ArrayList<RelationIdentifier> relationIdentifiers = new ArrayList<>();
-    dataMapSchema.setParentTables(relationIdentifiers);
-    relationIdentifiers.add(
+    RelationIdentifier relationIdentifier =
         new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(),
-            mainTable.getTableInfo().getFactTable().getTableId()));
+            mainTable.getTableInfo().getFactTable().getTableId());
+    relationIdentifiers.add(relationIdentifier);
+    dataMapSchema.setRelationIdentifier(relationIdentifier);
+    dataMapSchema.setParentTables(relationIdentifiers);
     DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
     DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory);
     storageProvider.saveSchema(dataMapSchema);
@@ -62,7 +65,8 @@ public class IndexDataMapProvider implements DataMapProvider {
 
   @Override
   public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException {
-    storageProvider.dropSchema(dataMapSchema.getDataMapName());
+    storageProvider.dropSchema(dataMapSchema.getDataMapName(),
+        dataMapSchema.getParentTables().get(0).getTableName(), dataMapSchema.getProviderName());
   }
 
   @Override
@@ -99,25 +103,29 @@ public class IndexDataMapProvider implements DataMapProvider {
     return dataMapFactory;
   }
 
-  private DataMapFactory getDataMapFactoryByShortName(String providerName)
+  public static DataMapFactory getDataMapFactoryByShortName(String providerName)
       throws MalformedDataMapCommandException {
+    try {
+      DataMapRegistry.registerDataMap(
+          DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(),
+          DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName());
+    } catch (UnsupportedOperationException ex) {
+      throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
+    }
     DataMapFactory dataMapFactory;
-    String className = DataMapRegistry.getDataMapClassName(providerName);
+    String className = DataMapRegistry.getDataMapClassName(providerName.toLowerCase());
     if (className != null) {
       try {
         Class<? extends DataMapFactory> datamapClass =
-            (Class<? extends DataMapFactory>) Class.forName(providerName);
+            (Class<? extends DataMapFactory>) Class.forName(className);
         dataMapFactory = datamapClass.newInstance();
       } catch (ClassNotFoundException ex) {
-        throw new MalformedDataMapCommandException(
-            "DataMap '" + providerName + "' not found", ex);
+        throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
       } catch (Throwable ex) {
-        throw new MetadataProcessException(
-            "failed to create DataMap '" + providerName + "'", ex);
+        throw new MetadataProcessException("failed to create DataMap '" + providerName + "'", ex);
       }
     } else {
-      throw new MalformedDataMapCommandException(
-          "DataMap '" + providerName + "' not found");
+      throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found");
     }
     return dataMapFactory;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 6689e3b..8143b1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -197,6 +197,13 @@ public final class TableDataMap extends OperationEventListener {
     dataMapFactory.clear();
   }
 
+  /**
+   * delete datamap data if any
+   */
+  public void deleteDatamapData() {
+    dataMapFactory.deleteDatamapData();
+  }
+
   public DataMapSchema getDataMapSchema() {
     return dataMapSchema;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index d27b255..70f2772 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.events.Event;
@@ -35,9 +35,9 @@ import org.apache.carbondata.events.Event;
 public interface DataMapFactory<T extends DataMap> {
 
   /**
-   * Initialization of Datamap factory with the identifier and datamap name
+   * Initialization of Datamap factory with the carbonTable and datamap name
    */
-  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws IOException, MalformedDataMapCommandException;
 
   /**
@@ -87,4 +87,9 @@ public interface DataMapFactory<T extends DataMap> {
    *  Type of datamap whether it is FG or CG
    */
   DataMapLevel getDataMapType();
+
+  /**
+   * delete datamap data if any
+   */
+  void deleteDatamapData();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 6a3ee18..29670a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -54,7 +54,7 @@ public abstract class DataMapWriter {
    *
    * @param blockId file name of the carbondata file
    */
-  public abstract void onBlockStart(String blockId) throws IOException;
+  public abstract void onBlockStart(String blockId, long taskId) throws IOException;
 
   /**
    * End of block notification

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2425c4c..4c8ac0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -72,8 +73,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
 
   @Override
-  public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
-    this.identifier = identifier;
+  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier();
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
   }
@@ -262,6 +263,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return null;
   }
 
+  @Override public void deleteDatamapData() {
+
+  }
+
   @Override public SegmentProperties getSegmentProperties(Segment segment,
       ReadCommittedScope readCommittedScope) throws IOException {
     List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 5902148..d609c56 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -672,7 +672,7 @@ public class SegmentFileStore {
   }
 
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap) {
+      Map<String, List<String>> locationMap) throws IOException {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       Path location = new Path(entry.getKey()).getParent();
       if (partitionSpecs != null) {
@@ -681,10 +681,10 @@ public class SegmentFileStore {
           FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
         }
       } else {
-        // delete the segment folder if it is empty
-        CarbonFile file = FileFactory.getCarbonFile(location.toString());
-        if (file.listFiles().length == 0) {
-          file.delete();
+        // delete the segment folder
+        CarbonFile segmentPath = FileFactory.getCarbonFile(location.toString());
+        if (null != segmentPath) {
+          FileFactory.deleteAllCarbonFilesOfDir(segmentPath);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
index 3934444..4ab400d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
@@ -28,7 +28,8 @@ package org.apache.carbondata.core.metadata.schema.datamap;
 
 public enum DataMapClassProvider {
   PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
-  TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries");
+  TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"),
+  LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene");
 
   /**
    * Fully qualified class name of datamap
@@ -64,8 +65,22 @@ public enum DataMapClassProvider {
       return TIMESERIES;
     } else if (PREAGGREGATE.isEqual(dataMapClass)) {
       return PREAGGREGATE;
+    } else if (LUCENE.isEqual(dataMapClass)) {
+      return LUCENE;
     } else {
       throw new UnsupportedOperationException("Unknown datamap provider/class " + dataMapClass);
     }
   }
+
+  public static DataMapClassProvider getDataMapProviderOnName(String dataMapShortname) {
+    if (TIMESERIES.isEqual(dataMapShortname)) {
+      return TIMESERIES;
+    } else if (PREAGGREGATE.isEqual(dataMapShortname)) {
+      return PREAGGREGATE;
+    } else if (LUCENE.isEqual(dataMapShortname)) {
+      return LUCENE;
+    } else {
+      throw new UnsupportedOperationException("Unknown datamap provider" + dataMapShortname);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 6b592fb..d9f83b3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -70,7 +70,6 @@ public class DataMapSchema implements Serializable, Writable {
    */
   protected TableSchema childSchema;
 
-
   public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
     this.providerName = providerName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
index 6b9bca5..ed13201 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
@@ -63,6 +63,7 @@ public interface DataMapSchemaStorageProvider {
    * Drop the schema from the storage by using dataMapName.
    * @param dataMapName
    */
-  void dropSchema(String dataMapName) throws IOException;
+  void dropSchema(String dataMapName, String tableName, String dataMapProviderName)
+      throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index d49a9ae..9e34131 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -51,9 +51,8 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();
-    String schemaPath =
-        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema.getDataMapName()
-            + ".dmschema";
+    String schemaPath = getSchemaPath(storePath, dataMapSchema.getDataMapName(),
+        dataMapSchema.relationIdentifier.getTableName(), dataMapSchema.getProviderName());
     FileFactory.FileType fileType = FileFactory.getFileType(schemaPath);
     if (FileFactory.isFileExist(schemaPath, fileType)) {
       throw new IOException(
@@ -129,9 +128,9 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
     return dataMapSchemas;
   }
 
-  @Override public void dropSchema(String dataMapName) throws IOException {
-    String schemaPath =
-        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName + ".dmschema";
+  @Override public void dropSchema(String dataMapName, String tableName, String dataMapProviderName)
+      throws IOException {
+    String schemaPath = getSchemaPath(storePath, dataMapName, tableName, dataMapProviderName);
     if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
       throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
     }
@@ -140,4 +139,20 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
       throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
     }
   }
+
+  /**
+   * it returns the schema path for the datamap
+   * @param storePath
+   * @param dataMapName
+   * @param tableName
+   * @param dataMapProviderName
+   * @return
+   */
+  public static String getSchemaPath(String storePath, String dataMapName, String tableName,
+      String dataMapProviderName) {
+    String schemaPath = storePath + CarbonCommonConstants.FILE_SEPARATOR + tableName
+        + CarbonCommonConstants.UNDERSCORE + dataMapName + CarbonCommonConstants.UNDERSCORE
+        + dataMapProviderName + ".dmschema";
+    return schemaPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index d2dbaa5..e68b481 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -90,7 +90,7 @@ public class MinMaxDataWriter extends DataMapWriter {
     }
   }
 
-  @Override public void onBlockStart(String blockId) {
+  @Override public void onBlockStart(String blockId, long taskId) {
     blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 758a67c..01aaffa 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -58,17 +58,11 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
 
   // this is an example for datamap, we can choose the columns and operations that
   // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
-  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+  @Override public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws IOException, MalformedDataMapCommandException {
-    this.identifier = identifier;
+    this.identifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
-    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName);
-    if (null == carbonTable) {
-      throw new IOException("Failed to get carbon table with name " + tableUniqueName);
-    }
-
     // columns that will be indexed
     List<CarbonColumn> allColumns = carbonTable.getCreateOrderColumn(identifier.getTableName());
     List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new Transformer() {
@@ -163,4 +157,8 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   @Override public DataMapMeta getMeta() {
     return this.dataMapMeta;
   }
+
+  @Override public void deleteDatamapData() {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index 7308841..f63656b 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -75,4 +75,8 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
     return DataMapLevel.CG;
   }
 
+  @Override public void deleteDatamapData() {
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
index 19e4035..1d47ee8 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
@@ -26,11 +26,22 @@ class LuceneDataMapDistributable extends DataMapDistributable {
   // TODO: seems no one use this?
   private String dataPath;
 
-  LuceneDataMapDistributable(String dataPath) {
+  private String indexPath;
+
+  LuceneDataMapDistributable(String dataPath, String indexPath) {
     this.dataPath = dataPath;
+    this.indexPath = indexPath;
   }
 
   public String getDataPath() {
     return dataPath;
   }
+
+  public String getIndexPath() {
+    return indexPath;
+  }
+
+  public void setIndexPath(String indexPath) {
+    this.indexPath = indexPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index ab4e9ee..672880f 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.datamap.lucene;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,13 +33,17 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -79,29 +84,21 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
    */
   AbsoluteTableIdentifier tableIdentifier = null;
 
+  /**
+   * indexed carbon columns for lucene
+   */
+  List<String> indexedCarbonColumns = null;
+
+
   @Override
-  public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws IOException, MalformedDataMapCommandException {
-    Objects.requireNonNull(identifier);
+    Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier());
     Objects.requireNonNull(dataMapSchema);
 
-    this.tableIdentifier = identifier;
+    this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
-    // get carbonmetadata from carbonmetadata instance
-    CarbonMetadata carbonMetadata = CarbonMetadata.getInstance();
-
-    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
-
-    // get carbon table
-    CarbonTable carbonTable = carbonMetadata.getCarbonTable(tableUniqueName);
-    if (carbonTable == null) {
-      String errorMessage =
-          String.format("failed to get carbon table with name %s", tableUniqueName);
-      LOGGER.error(errorMessage);
-      throw new IOException(errorMessage);
-    }
-
     // validate DataMapSchema and get index columns
     List<String> indexedColumns =  validateAndGetIndexedColumns(dataMapSchema, carbonTable);
 
@@ -151,7 +148,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
         }
       }
     }
-    List<String> textColumnList = new ArrayList<String>(textColumns.length);
+    indexedCarbonColumns = new ArrayList<>(textColumns.length);
     for (int i = 0; i < textColumns.length; i++) {
       CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]);
       if (null == column) {
@@ -161,10 +158,41 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
         throw new MalformedDataMapCommandException(
             "TEXT_COLUMNS only supports String column. " + "Unsupported column: " + textColumns[i]
                 + ", DataType: " + column.getDataType());
+      } else if (column.getEncoder().contains(Encoding.DICTIONARY)) {
+        throw new MalformedDataMapCommandException(
+            "TEXT_COLUMNS cannot contain dictionary column " + column.getColName());
+      }
+      indexedCarbonColumns.add(column.getColName());
+    }
+    return indexedCarbonColumns;
+  }
+
+  /**
+   * this method will delete the datamap folders during drop datamap
+   * @throws MalformedDataMapCommandException
+   */
+  private void deleteDatamap() throws MalformedDataMapCommandException {
+    SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier);
+    try {
+      List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
+      for (Segment segment : validSegments) {
+        String segmentId = segment.getSegmentNo();
+        String datamapPath =
+            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId)
+                + File.separator + dataMapName;
+        if (FileFactory.isFileExist(datamapPath)) {
+          CarbonFile file =
+              FileFactory.getCarbonFile(datamapPath, FileFactory.getFileType(datamapPath));
+          CarbonUtil.deleteFoldersAndFilesSilent(file);
+        }
       }
-      textColumnList.add(column.getColName());
+    } catch (IOException ex) {
+      throw new MalformedDataMapCommandException(
+          "drop datamap failed, failed to delete datamap directory");
+    } catch (InterruptedException ex) {
+      throw new MalformedDataMapCommandException(
+          "drop datamap failed, failed to delete datamap directory");
     }
-    return textColumnList;
   }
 
   /**
@@ -173,8 +201,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
   @Override
   public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
     LOGGER.info("lucene data write to " + writeDirectoryPath);
-    return new LuceneDataMapWriter(
-        tableIdentifier, dataMapName, segment, writeDirectoryPath, true);
+    return new LuceneDataMapWriter(tableIdentifier, dataMapName, segment, writeDirectoryPath, true,
+        indexedCarbonColumns);
   }
 
   /**
@@ -183,9 +211,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
   @Override
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> lstDataMapDistribute = new ArrayList<DataMapDistributable>();
-    DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
-        CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()));
-    lstDataMapDistribute.add(luceneDataMapDistributable);
+    CarbonFile[] indexDirs = LuceneDataMapWriter
+        .getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName);
+    for (CarbonFile indexDir : indexDirs) {
+      DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
+          CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()),
+          indexDir.getAbsolutePath());
+      lstDataMapDistribute.add(luceneDataMapDistributable);
+    }
     return lstDataMapDistribute;
   }
 
@@ -210,6 +243,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac
 
   }
 
+  @Override public void deleteDatamapData() {
+    try {
+      deleteDatamap();
+    } catch (MalformedDataMapCommandException ex) {
+      LOGGER.error(ex, "failed to delete datamap directory ");
+    }
+  }
+
   /**
    * Return metadata of this datamap
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 4286e5a..83454b3 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -20,23 +20,30 @@ package org.apache.carbondata.datamap.lucene;
 import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
+import org.apache.lucene.codecs.lucene62.Lucene62Codec;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.DoublePoint;
 import org.apache.lucene.document.Field;
@@ -77,6 +84,8 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   private boolean isFineGrain = true;
 
+  private List<String> indexedCarbonColumns = null;
+
   private static final String BLOCKID_NAME = "blockId";
 
   private static final String BLOCKLETID_NAME = "blockletId";
@@ -86,30 +95,31 @@ public class LuceneDataMapWriter extends DataMapWriter {
   private static final String ROWID_NAME = "rowId";
 
   LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, Segment segment,
-      String writeDirectoryPath, boolean isFineGrain) {
+      String writeDirectoryPath, boolean isFineGrain, List<String> indexedCarbonColumns) {
     super(identifier, segment, writeDirectoryPath);
     this.dataMapName = dataMapName;
     this.isFineGrain = isFineGrain;
+    this.indexedCarbonColumns = indexedCarbonColumns;
   }
 
-  private String getIndexPath() {
+  private String getIndexPath(long taskId) {
     if (isFineGrain) {
-      return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName);
+      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, taskId);
     } else {
       // TODO: where write data in coarse grain data map
-      return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName);
+      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, taskId);
     }
   }
 
   /**
    * Start of new block notification.
    */
-  public void onBlockStart(String blockId) throws IOException {
+  public void onBlockStart(String blockId, long taskId) throws IOException {
     // save this block id for lucene index , used in onPageAdd function
     this.blockId = blockId;
 
     // get index path, put index data into segment's path
-    String strIndexPath = getIndexPath();
+    String strIndexPath = getIndexPath(taskId);
     Path indexPath = FileFactory.getPath(strIndexPath);
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
@@ -124,6 +134,18 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
     // create a index writer
     Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) {
+      indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+    }
+
     indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
 
   }
@@ -202,8 +224,10 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
       // add other fields
       for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
-        if (!pages[colIdx].getNullBits().get(rowId)) {
-          addField(doc, pages[colIdx], rowId, Field.Store.NO);
+        if (indexedCarbonColumns.contains(pages[colIdx].getColumnSpec().getFieldName())) {
+          if (!pages[colIdx].getNullBits().get(rowId)) {
+            addField(doc, pages[colIdx], rowId, Field.Store.NO);
+          }
         }
       }
 
@@ -226,7 +250,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
     String fieldName = page.getColumnSpec().getFieldName();
 
     //get field type
-    DataType type = page.getDataType();
+    DataType type = page.getColumnSpec().getSchemaDataType();
 
     if (type == DataTypes.BYTE) {
       // byte type , use int range to deal with byte, lucene has no byte type
@@ -322,8 +346,44 @@ public class LuceneDataMapWriter extends DataMapWriter {
   /**
    * Return store path for datamap
    */
-  static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) {
+  public static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) {
     return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
   }
 
+  /**
+   * Return store path for datamap based on the taskId, if three tasks get launched during loading,
+   * then three folders will be created based on the three task Ids and lucene index file will be
+   * written into those folders
+   * @return store path based on taskID
+   */
+  private static String genDataMapStorePathOnTaskId(String tablePath, String segmentId,
+      String dataMapName, long taskId) {
+    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName
+        + File.separator + dataMapName + CarbonCommonConstants.UNDERSCORE + taskId
+        + CarbonCommonConstants.UNDERSCORE + System.currentTimeMillis();
+  }
+
+  /**
+   * returns all the directories of lucene index files for query
+   * @param tablePath
+   * @param segmentId
+   * @param dataMapName
+   * @return
+   */
+  public static CarbonFile[] getAllIndexDirs(String tablePath, String segmentId,
+      final String dataMapName) {
+    String dmPath =
+        CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+    FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
+    final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
+    return dirPath.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        if (file.isDirectory() && file.getName().startsWith(dataMapName)) {
+          return true;
+        } else {
+          return false;
+        }
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index faa4cbe..3caefd2 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -67,14 +67,14 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
       LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName());
 
   /**
-   * searcher object for this datamap
+   * index Reader object to create searcher object
    */
-  private IndexSearcher indexSearcher = null;
+  private IndexReader indexReader = null;
 
   /**
-   * default max values to return
+   * searcher object for this datamap
    */
-  private static int MAX_RESULT_NUMBER = 100;
+  private IndexSearcher indexSearcher = null;
 
   /**
    * analyzer for lucene index
@@ -113,7 +113,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     // open this index path , use HDFS default configuration
     Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
 
-    IndexReader indexReader = DirectoryReader.open(indexDir);
+    indexReader = DirectoryReader.open(indexDir);
     if (indexReader == null) {
       throw new RuntimeException("failed to create index reader object");
     }
@@ -153,6 +153,10 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     // only for test , query all data
     String strQuery = getQueryString(filterExp.getFilterExpression());
 
+    if (null == strQuery) {
+      return null;
+    }
+
     String[] sFields = new String[fields.size()];
     fields.toArray(sFields);
 
@@ -163,9 +167,11 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
     // use MultiFieldQueryParser to parser query
     QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
+    queryParser.setAllowLeadingWildcard(true);
     Query query;
     try {
-      query = queryParser.parse(strQuery);
+      // always send lowercase string to lucene as it is case sensitive
+      query = queryParser.parse(strQuery.toLowerCase());
     } catch (ParseException e) {
       String errorMessage = String.format(
           "failed to filter block with query %s, detail is %s", strQuery, e.getMessage());
@@ -176,7 +182,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     // execute index search
     TopDocs result;
     try {
-      result = indexSearcher.search(query, MAX_RESULT_NUMBER);
+      result = indexSearcher.search(query, indexReader.maxDoc());
     } catch (IOException e) {
       String errorMessage =
           String.format("failed to search lucene data, detail is %s", e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 23c9928..151e674 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -62,7 +62,17 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
   public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable,
       ReadCommittedScope readCommittedScope)
       throws IOException {
-    return getDataMaps(distributable.getSegment(), readCommittedScope);
+    List<FineGrainDataMap> lstDataMap = new ArrayList<>();
+    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
+    String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath();
+    try {
+      dataMap.init(new DataMapModel(indexPath));
+    } catch (MemoryException e) {
+      LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage()));
+      return lstDataMap;
+    }
+    lstDataMap.add(dataMap);
+    return lstDataMap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
deleted file mode 100644
index f128afe..0000000
--- a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/datamap_input.csv"
-
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 15000
-    LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-  }
-
-  test("test lucene coarse grain data map") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-
-    sql(
-      s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
-         | USING '${classOf[LuceneCoarseGrainDataMapFactory].getName}'
-         | DMProperties('TEXT_COLUMNS'='name,city')
-      """.stripMargin)
-
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-
-    checkAnswer(sql("select * from datamap_test where name='n502670'"),
-      sql("select * from normal_test where name='n502670'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    LuceneFineGrainDataMapSuite.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
deleted file mode 100644
index bfcfa67..0000000
--- a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene
-
-import java.io.{File, PrintWriter}
-
-import scala.util.Random
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
-
-class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/datamap_input.csv"
-
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 15000
-    LuceneFineGrainDataMapSuite.createFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
-
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'carbondata'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-  }
-
-  test("validate TEXT_COLUMNS DataMap property") {
-    // require TEXT_COLUMNS
-    var exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-      """.stripMargin))
-
-    assertResult("Lucene DataMap require proper TEXT_COLUMNS property.")(exception.getMessage)
-
-    // illegal argumnet.
-    exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='name, ')
-      """.stripMargin))
-
-    assertResult("TEXT_COLUMNS contains illegal argument.")(exception.getMessage)
-
-    // not exists
-    exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='city,school')
-    """.stripMargin))
-
-    assertResult("TEXT_COLUMNS: school does not exist in table. Please check create DataMap statement.")(exception.getMessage)
-
-    // duplicate columns
-    exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='name,city,name')
-      """.stripMargin))
-
-    assertResult("TEXT_COLUMNS has duplicate columns :name")(exception.getMessage)
-
-    // only support String DataType
-    exception = intercept[MalformedDataMapCommandException](sql(
-    s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='city,id')
-      """.stripMargin))
-
-    assertResult("TEXT_COLUMNS only supports String column. Unsupported column: id, DataType: INT")(exception.getMessage)
-  }
-
-  test("test lucene fine grain data map") {
-    sql(
-      s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
-         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('TEXT_COLUMNS'='Name , cIty')
-      """.stripMargin)
-
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-
-    //    sql("select * from normal_test where name='n34000'").show
-    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
-//    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 'n10%'"))
-    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
-
-    //    checkAnswer(
-    //      sql("select * from datamap_test where match('name:n34000')"),
-    //      sql("select * from normal_test where name='n34000'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    LuceneFineGrainDataMapSuite.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-  }
-}
-
-object LuceneFineGrainDataMapSuite {
-  def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
-    val write = new PrintWriter(new File(fileName))
-    for (i <- start until (start + line)) {
-      write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80))
-    }
-    write.close()
-  }
-
-  def deleteFile(fileName: String): Unit = {
-      val file = new File(fileName)
-      if (file.exists()) {
-        file.delete()
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
new file mode 100644
index 0000000..245d147
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+/**
+ * Ignored test class as CG datamap is not supported yet
+ */
+@Ignore
+class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 15000
+    LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test lucene coarse grain data map") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+      sql(
+        s"""
+           | CREATE DATAMAP dm ON TABLE datamap_test
+           | USING 'lucene'
+           | DMProperties('TEXT_COLUMNS'='name,city')
+      """.stripMargin)
+
+     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+
+     checkAnswer(sql("select * from datamap_test where name='n502670'"),
+     sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
new file mode 100644
index 0000000..c5ea2c7
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 15000
+    LuceneFineGrainDataMapSuite.createFile(file2)
+    sql("create database if not exists lucene")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+        CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+    sql("use lucene")
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+  }
+
+  test("validate TEXT_COLUMNS DataMap property") {
+    // require TEXT_COLUMNS
+    var exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+      """.stripMargin))
+
+    assertResult("Lucene DataMap require proper TEXT_COLUMNS property.")(exception.getMessage)
+
+    // illegal argumnet.
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='name, ')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS contains illegal argument.")(exception.getMessage)
+
+    // not exists
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='city,school')
+    """.stripMargin))
+
+    assertResult("TEXT_COLUMNS: school does not exist in table. Please check create DataMap statement.")(exception.getMessage)
+
+    // duplicate columns
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='name,city,name')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS has duplicate columns :name")(exception.getMessage)
+
+    // only support String DataType
+    exception = intercept[MalformedDataMapCommandException](sql(
+    s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='city,id')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS only supports String column. Unsupported column: id, DataType: INT")(exception.getMessage)
+  }
+
+  test("test lucene fine grain data map") {
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+
+    //    sql("select * from normal_test where name='n34000'").show
+    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
+//    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 'n10%'"))
+    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
+
+    //    checkAnswer(
+    //      sql("select * from datamap_test where match('name:n34000')"),
+    //      sql("select * from normal_test where name='n34000'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql("use default")
+    sql("drop database if exists lucene cascade")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+        CarbonProperties.getStorePath)
+  }
+}
+
+object LuceneFineGrainDataMapSuite {
+  def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+    val write = new PrintWriter(new File(fileName))
+    for (i <- start until (start + line)) {
+      write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80))
+    }
+    write.close()
+  }
+
+  def deleteFile(fileName: String): Unit = {
+      val file = new File(fileName)
+      if (file.exists()) {
+        file.delete()
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 0c96247..20e01ff 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, DiskBasedDMSchemaStorageProvider}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
@@ -55,8 +55,8 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+  override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
     this.dataMapSchema = dataMapSchema
   }
 
@@ -143,6 +143,12 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
     new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
       List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+  }
 }
 
 class CGDataMap extends CoarseGrainDataMap {
@@ -243,7 +249,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
    *
    * @param blockId file name of the carbondata file
    */
-  override def onBlockStart(blockId: String): Unit = {
+  override def onBlockStart(blockId: String, taskId: Long): Unit = {
     currentBlockId = blockId
   }
 
@@ -326,6 +332,8 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
 class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
 
   val file2 = resourcesPath + "/compaction/fil2.csv"
+  val systemFolderStoreLocation = CarbonProperties.getInstance().getSystemFolderLocation
+
   override protected def beforeAll(): Unit = {
     //n should be about 5000000 of reset if size is default 1024
     val n = 150000
@@ -384,9 +392,10 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql(s"create datamap test_cg_datamap on table datamap_store_test using '${classOf[CGDataMapFactory].getName}' as select  id, name from datamap_store_test")
+    val dataMapProvider = classOf[CGDataMapFactory].getName
+    sql(s"create datamap test_cg_datamap on table datamap_store_test using '$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap.dmschema"
+    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap", "datamap_store_test", dataMapProvider)
 
     assert(FileFactory.isFileExist(loc))
   }
@@ -400,9 +409,10 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using '${classOf[CGDataMapFactory].getName}' as select  id, name from datamap_store_test")
+    val dataMapProvider = classOf[CGDataMapFactory].getName
+    sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using '$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap1.dmschema"
+    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap1", "datamap_store_test1", dataMapProvider)
 
     assert(FileFactory.isFileExist(loc))
 
@@ -420,9 +430,10 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using '${classOf[CGDataMapFactory].getName}' as select  id, name from datamap_store_test")
+    val dataMapProvider = classOf[CGDataMapFactory].getName
+    sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using '$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap2.dmschema"
+    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation,"test_cg_datamap2", "datamap_store_test2", dataMapProvider)
 
     assert(FileFactory.isFileExist(loc))
 
@@ -442,4 +453,5 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamap_store_test1")
     sql("DROP TABLE IF EXISTS datamap_store_test2")
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 270c676..5fa5209 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -43,9 +44,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
 
   var identifier: AbsoluteTableIdentifier = _
 
-  override def init(identifier: AbsoluteTableIdentifier,
+  override def init(carbonTable: CarbonTable,
       dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
   }
 
   override def fireEvent(event: Event): Unit = ???
@@ -72,6 +73,12 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
     ???
   }
 
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+    ???
+  }
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
@@ -203,7 +210,7 @@ object DataMapWriterSuite {
      *
      * @param blockId file name of the carbondata file
      */
-    override def onBlockStart(blockId: String) = {
+    override def onBlockStart(blockId: String, taskId: Long) = {
       callbackSeq :+= s"block start $blockId"
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 060d06a..551f9e1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
@@ -57,8 +57,8 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+  override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
     this.dataMapSchema = dataMapSchema
   }
 
@@ -141,6 +141,13 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
     new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
       List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+    ???
+  }
 }
 
 class FGDataMap extends FineGrainDataMap {
@@ -271,7 +278,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
    *
    * @param blockId file name of the carbondata file
    */
-  override def onBlockStart(blockId: String): Unit = {
+  override def onBlockStart(blockId: String, taskId: Long): Unit = {
     currentBlockId = blockId
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index be9cc51..280c20d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -204,7 +205,7 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
       override def onBlockletStart(blockletId: Int): Unit = { }
 
-      override def onBlockStart(blockId: String): Unit = {
+      override def onBlockStart(blockId: String, taskId: Long): Unit = {
         // trigger the second SQL to execute
       }
 
@@ -218,8 +219,14 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
   override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ???
 
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+  override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
+  }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 86f0f10..5c9709c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -306,7 +307,7 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
       override def onBlockletStart(blockletId: Int): Unit = { }
 
-      override def onBlockStart(blockId: String): Unit = {
+      override def onBlockStart(blockId: String, taskId: Long): Unit = {
         // trigger the second SQL to execute
         Global.overwriteRunning = true
 
@@ -324,8 +325,15 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
   override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ???
 
-  override def init(identifier: AbsoluteTableIdentifier,
+  override def init(carbonTable: CarbonTable,
       dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
+  }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+
   }
 }
\ No newline at end of file