You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/18 12:19:59 UTC

carbondata git commit: [CARBONDATA-2353] Added cache for datamap schema provider and added tests

Repository: carbondata
Updated Branches:
  refs/heads/master 860e144d4 -> 5f2a748f6


[CARBONDATA-2353] Added cache for datamap schema provider and added tests

Problem:
Currently, there is no cache for datamap schema provider, so every time it reads schema from disk.
Solution:
Add cache to the DiskBasedDMSchemaStorageProvider and refresh the cache depends on the modified time of datamap mdt file.

This closes #2176


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f2a748f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f2a748f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f2a748f

Branch: refs/heads/master
Commit: 5f2a748f6a7b1d02957de5cc147594d698116109
Parents: 860e144
Author: ravipesala <ra...@gmail.com>
Authored: Mon Apr 16 18:22:23 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Apr 18 20:19:30 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapChooser.java |   4 +-
 .../core/datamap/DataMapStoreManager.java       |  77 ++++----
 .../core/datamap/IndexDataMapProvider.java      |  17 +-
 .../datamap/status/DataMapStatusManager.java    |  32 ++--
 .../schema/table/AggregationDataMapSchema.java  |  12 ++
 .../metadata/schema/table/DataMapSchema.java    |  11 ++
 .../table/DataMapSchemaStorageProvider.java     |  14 +-
 .../table/DiskBasedDMSchemaStorageProvider.java | 147 ++++++++++-----
 .../DiskBasedDMSchemaStoraheProviderSuite.java  | 187 +++++++++++++++++++
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  81 +++++++-
 .../preaggregate/TestPreAggregateDrop.scala     |   2 +-
 .../timeseries/TestTimeSeriesDropSuite.scala    |   6 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |   6 +-
 .../carbondata/datamap/DataMapManager.java      |  10 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   7 +-
 .../datamap/CarbonDataMapShowCommand.scala      |   2 +-
 .../datamap/CarbonDropDataMapCommand.scala      |  36 ++--
 .../command/table/CarbonDropTableCommand.scala  |   3 +-
 .../datamap/DataMapWriterListener.java          |   8 +-
 19 files changed, 494 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index a40644a..cdba6c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -68,7 +69,8 @@ public class DataMapChooser {
   /**
    * Return a chosen datamap based on input filter. See {@link DataMapChooser}
    */
-  public DataMapExprWrapper choose(CarbonTable carbonTable, FilterResolverIntf resolverIntf) {
+  public DataMapExprWrapper choose(CarbonTable carbonTable, FilterResolverIntf resolverIntf)
+      throws IOException {
     if (resolverIntf != null) {
       Expression expression = resolverIntf.getFilterExpression();
       // First check for FG datamaps if any exist

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 169cbde..f1c0321 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -64,6 +64,9 @@ public final class DataMapStoreManager {
 
   private Map<String, TableSegmentRefresher> segmentRefreshMap = new ConcurrentHashMap<>();
 
+  private DataMapSchemaStorageProvider provider = new DiskBasedDMSchemaStorageProvider(
+      CarbonProperties.getInstance().getSystemFolderLocation());
+
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
 
@@ -75,7 +78,8 @@ public final class DataMapStoreManager {
    * It gives all datamaps of type @mapType except the default datamap.
    *
    */
-  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable, DataMapLevel mapType) {
+  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable, DataMapLevel mapType)
+      throws IOException {
     List<TableDataMap> dataMaps = new ArrayList<>();
     List<TableDataMap> tableIndices = getAllDataMap(carbonTable);
     if (tableIndices != null) {
@@ -93,9 +97,8 @@ public final class DataMapStoreManager {
    *
    * @return
    */
-  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable) {
-    // TODO cache all schemas and update only when datamap status file updates
-    List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
+  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable) throws IOException {
+    List<DataMapSchema> dataMapSchemas = getDataMapSchemasOfTable(carbonTable);
     List<TableDataMap> dataMaps = new ArrayList<>();
     if (dataMapSchemas != null) {
       for (DataMapSchema dataMapSchema : dataMapSchemas) {
@@ -111,47 +114,40 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * It gives all datamap schemas.
+   * It gives all datamap schemas of a given table.
    *
-   * @return
    */
-  public List<DataMapSchema> getAllDataMapSchemas(CarbonTable carbonTable) {
-    // TODO cache all schemas and update only when datamap status file updates
-    List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
-    List<DataMapSchema> dataMaps = new ArrayList<>();
-    if (dataMapSchemas != null) {
-      for (DataMapSchema dataMapSchema : dataMapSchemas) {
-        RelationIdentifier identifier = dataMapSchema.getParentTables().get(0);
-        if (dataMapSchema.isIndexDataMap() && identifier.getTableName()
-            .equals(carbonTable.getTableName()) && identifier.getDatabaseName()
-            .equals(carbonTable.getDatabaseName())) {
-          dataMaps.add(dataMapSchema);
-        }
-      }
-    }
-    return dataMaps;
+  public List<DataMapSchema> getDataMapSchemasOfTable(CarbonTable carbonTable) throws IOException {
+    return provider.retrieveSchemas(carbonTable);
   }
 
-  public List<DataMapSchema> getAllDataMapSchemas() {
-    DataMapSchemaStorageProvider provider = new DiskBasedDMSchemaStorageProvider(
-        CarbonProperties.getInstance().getSystemFolderLocation());
-    List<DataMapSchema> dataMapSchemas;
-    try {
-      dataMapSchemas = provider.retrieveAllSchemas();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return dataMapSchemas;
+  /**
+   * It gives all datamap schemas from store.
+   */
+  public List<DataMapSchema> getAllDataMapSchemas() throws IOException {
+    return provider.retrieveAllSchemas();
   }
 
-  public DataMapSchema getDataMapSchema(String dataMapName) throws NoSuchDataMapException {
-    List<DataMapSchema> allDataMapSchemas = getAllDataMapSchemas();
-    for (DataMapSchema dataMapSchema : allDataMapSchemas) {
-      if (dataMapSchema.getDataMapName().equalsIgnoreCase(dataMapName)) {
-        return dataMapSchema;
-      }
-    }
-    throw new NoSuchDataMapException(dataMapName);
+
+  public DataMapSchema getDataMapSchema(String dataMapName)
+      throws NoSuchDataMapException, IOException {
+    return provider.retrieveSchema(dataMapName);
+  }
+
+  /**
+   * Saves the datamap schema to storage
+   * @param dataMapSchema
+   */
+  public void saveDataMapSchema(DataMapSchema dataMapSchema) throws IOException {
+    provider.saveSchema(dataMapSchema);
+  }
+
+  /**
+   * Drops the datamap schema from storage
+   * @param dataMapName
+   */
+  public void dropDataMapSchema(String dataMapName) throws IOException {
+    provider.dropSchema(dataMapName);
   }
 
   /**
@@ -306,7 +302,8 @@ public final class DataMapStoreManager {
    * @param carbonTable
    * @param segments
    */
-  public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments) {
+  public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments)
+      throws IOException {
     getDefaultDataMap(carbonTable).clear(segments);
     List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
     for (TableDataMap dataMap: allDataMap) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
index 85a4341..a22bd0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
@@ -27,16 +27,12 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 
 @InterfaceAudience.Internal
 public class IndexDataMapProvider implements DataMapProvider {
 
-  private DataMapSchemaStorageProvider storageProvider;
-
-  public IndexDataMapProvider(DataMapSchemaStorageProvider storageProvider) {
-    this.storageProvider = storageProvider;
+  public IndexDataMapProvider() {
   }
 
   @Override
@@ -55,7 +51,7 @@ public class IndexDataMapProvider implements DataMapProvider {
     dataMapSchema.setParentTables(relationIdentifiers);
     DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
     DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory);
-    storageProvider.saveSchema(dataMapSchema);
+    DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema);
   }
 
   @Override
@@ -65,12 +61,17 @@ public class IndexDataMapProvider implements DataMapProvider {
 
   @Override
   public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException {
-    storageProvider.dropSchema(dataMapSchema.getDataMapName(),
-        dataMapSchema.getParentTables().get(0).getTableName(), dataMapSchema.getProviderName());
+    if (mainTable == null) {
+      throw new UnsupportedOperationException("Table need to be specified in index datamaps");
+    }
+    DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName());
   }
 
   @Override
   public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema) {
+    if (mainTable == null) {
+      throw new UnsupportedOperationException("Table need to be specified in index datamaps");
+    }
     DataMapStoreManager.getInstance().clearDataMap(
         mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index cdb7d01..10ed80c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -51,7 +52,7 @@ public class DataMapStatusManager {
   }
 
   public static void disableDataMap(String dataMapName) throws Exception {
-    DataMapSchema dataMapSchema = validateDataMap(dataMapName, false);
+    DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     List<DataMapSchema> list = new ArrayList<>();
     if (dataMapSchema != null) {
       list.add(dataMapSchema);
@@ -61,12 +62,12 @@ public class DataMapStatusManager {
 
   public static void disableDataMapsOfTable(CarbonTable table) throws IOException {
     List<DataMapSchema> allDataMapSchemas =
-        DataMapStoreManager.getInstance().getAllDataMapSchemas(table);
+        DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
     storageProvider.updateDataMapStatus(allDataMapSchemas, DataMapStatus.DISABLED);
   }
 
-  public static void enableDataMap(String dataMapName) throws IOException {
-    DataMapSchema dataMapSchema = validateDataMap(dataMapName, false);
+  public static void enableDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
+    DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     List<DataMapSchema> list = new ArrayList<>();
     if (dataMapSchema != null) {
       list.add(dataMapSchema);
@@ -76,12 +77,12 @@ public class DataMapStatusManager {
 
   public static void enableDataMapsOfTable(CarbonTable table) throws IOException {
     List<DataMapSchema> allDataMapSchemas =
-        DataMapStoreManager.getInstance().getAllDataMapSchemas(table);
+        DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
     storageProvider.updateDataMapStatus(allDataMapSchemas, DataMapStatus.ENABLED);
   }
 
-  public static void dropDataMap(String dataMapName) throws IOException {
-    DataMapSchema dataMapSchema = validateDataMap(dataMapName, false);
+  public static void dropDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
+    DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     List<DataMapSchema> list = new ArrayList<>();
     if (dataMapSchema != null) {
       list.add(dataMapSchema);
@@ -89,20 +90,9 @@ public class DataMapStatusManager {
     storageProvider.updateDataMapStatus(list, DataMapStatus.DROPPED);
   }
 
-  private static DataMapSchema validateDataMap(String dataMapName, boolean valdate) {
-    List<DataMapSchema> allDataMapSchemas =
-        DataMapStoreManager.getInstance().getAllDataMapSchemas();
-    DataMapSchema dataMapSchema = null;
-    for (DataMapSchema schema : allDataMapSchemas) {
-      if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
-        dataMapSchema = schema;
-      }
-    }
-    if (dataMapSchema == null && valdate) {
-      throw new UnsupportedOperationException("Cannot be disabled non exist datamap");
-    } else {
-      return dataMapSchema;
-    }
+  private static DataMapSchema getDataMapSchema(String dataMapName)
+      throws IOException, NoSuchDataMapException {
+    return DataMapStoreManager.getInstance().getDataMapSchema(dataMapName);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index a43926c..673a7ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -359,4 +359,16 @@ public class AggregationDataMapSchema extends DataMapSchema {
       this.aggExpToColumnMapping = aggExpToColumnMapping;
     }
   }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    AggregationDataMapSchema that = (AggregationDataMapSchema) o;
+    return that == this;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index d9f83b3..235c312 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
 
@@ -200,4 +201,14 @@ public class DataMapSchema implements Serializable, Writable {
     }
   }
 
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    DataMapSchema that = (DataMapSchema) o;
+    return Objects.equals(dataMapName, that.dataMapName);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(dataMapName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
index ed13201..622e87b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
@@ -42,18 +43,18 @@ public interface DataMapSchemaStorageProvider {
    * Retrieve the schema by using dataMapName.
    * @param dataMapName
    */
-  DataMapSchema retrieveSchema(String dataMapName) throws IOException;
+  DataMapSchema retrieveSchema(String dataMapName) throws IOException, NoSuchDataMapException;
 
   /**
-   * Retrieve schemas by using the list of datamap names
-   * @param dataMapNames
+   * Retrieve schemas of the given table.
+   * @param table
    * @return
    * @throws IOException
    */
-  List<DataMapSchema> retrieveSchemas(List<String> dataMapNames) throws IOException;
+  List<DataMapSchema> retrieveSchemas(CarbonTable table) throws IOException;
 
   /**
-   * Retrieve all schemas
+   * Retrieve all datamap schemas from store.
    * @return
    * @throws IOException
    */
@@ -63,7 +64,6 @@ public interface DataMapSchemaStorageProvider {
    * Drop the schema from the storage by using dataMapName.
    * @param dataMapName
    */
-  void dropSchema(String dataMapName, String tableName, String dataMapProviderName)
-      throws IOException;
+  void dropSchema(String dataMapName) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index 9e34131..9168f55 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -26,8 +26,11 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -35,6 +38,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 /**
  * Stores datamap schema in disk as json format
@@ -43,16 +48,22 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
 
   private String storePath;
 
+  private String mdtFilePath;
+
+  private long lastModifiedTime;
+
+  private Set<DataMapSchema> dataMapSchemas = new HashSet<>();
+
   public DiskBasedDMSchemaStorageProvider(String storePath) {
     this.storePath = storePath;
+    this.mdtFilePath = storePath + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile";
   }
 
   @Override public void saveSchema(DataMapSchema dataMapSchema) throws IOException {
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();
-    String schemaPath = getSchemaPath(storePath, dataMapSchema.getDataMapName(),
-        dataMapSchema.relationIdentifier.getTableName(), dataMapSchema.getProviderName());
+    String schemaPath = getSchemaPath(storePath, dataMapSchema.getDataMapName());
     FileFactory.FileType fileType = FileFactory.getFileType(schemaPath);
     if (FileFactory.isFileExist(schemaPath, fileType)) {
       throw new IOException(
@@ -73,48 +84,47 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
       if (null != brWriter) {
         brWriter.flush();
       }
+      checkAndReloadDataMapSchemas();
+      dataMapSchemas.add(dataMapSchema);
+      touchMDTFile();
       CarbonUtil.closeStreams(dataOutputStream, brWriter);
     }
   }
 
-  @Override public DataMapSchema retrieveSchema(String dataMapName) throws IOException {
-    if (!dataMapName.endsWith(".dmschema")) {
-      dataMapName = dataMapName + ".dmschema";
-    }
-    String schemaPath =
-        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
-    if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
-      throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
-    }
-
-    Gson gsonObjectToRead = new Gson();
-    DataInputStream dataInputStream = null;
-    BufferedReader buffReader = null;
-    InputStreamReader inStream = null;
-    try {
-      dataInputStream =
-          FileFactory.getDataInputStream(schemaPath, FileFactory.getFileType(schemaPath));
-      inStream = new InputStreamReader(dataInputStream,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-      buffReader = new BufferedReader(inStream);
-      return gsonObjectToRead.fromJson(buffReader, DataMapSchema.class);
-    } finally {
-      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+  @Override public DataMapSchema retrieveSchema(String dataMapName)
+      throws IOException, NoSuchDataMapException {
+    checkAndReloadDataMapSchemas();
+    for (DataMapSchema dataMapSchema : dataMapSchemas) {
+      if (dataMapSchema.getDataMapName().equalsIgnoreCase(dataMapName)) {
+        return dataMapSchema;
+      }
     }
-
+    throw new NoSuchDataMapException(dataMapName);
   }
 
-  @Override public List<DataMapSchema> retrieveSchemas(List<String> dataMapNames)
-      throws IOException {
-    List<DataMapSchema> dataMapSchemas = new ArrayList<>(dataMapNames.size());
-    for (String dataMapName : dataMapNames) {
-      dataMapSchemas.add(retrieveSchema(dataMapName));
+  @Override public List<DataMapSchema> retrieveSchemas(CarbonTable carbonTable) throws IOException {
+    checkAndReloadDataMapSchemas();
+    List<DataMapSchema> dataMapSchemas = new ArrayList<>();
+    for (DataMapSchema dataMapSchema : this.dataMapSchemas) {
+      List<RelationIdentifier> parentTables = dataMapSchema.getParentTables();
+      for (RelationIdentifier identifier : parentTables) {
+        if (identifier.getTableName().equals(carbonTable.getTableName()) &&
+            identifier.getDatabaseName().equals(carbonTable.getDatabaseName())) {
+          dataMapSchemas.add(dataMapSchema);
+          break;
+        }
+      }
     }
     return dataMapSchemas;
   }
 
   @Override public List<DataMapSchema> retrieveAllSchemas() throws IOException {
-    List<DataMapSchema> dataMapSchemas = new ArrayList<>();
+    checkAndReloadDataMapSchemas();
+    return new ArrayList<>(dataMapSchemas);
+  }
+
+  private Set<DataMapSchema> retrieveAllSchemasInternal() throws IOException {
+    Set<DataMapSchema> dataMapSchemas = new HashSet<>();
     CarbonFile carbonFile = FileFactory.getCarbonFile(storePath);
     CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
@@ -123,36 +133,87 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro
     });
 
     for (CarbonFile file :carbonFiles) {
-      dataMapSchemas.add(retrieveSchema(file.getName()));
+      Gson gsonObjectToRead = new Gson();
+      DataInputStream dataInputStream = null;
+      BufferedReader buffReader = null;
+      InputStreamReader inStream = null;
+      try {
+        String absolutePath = file.getAbsolutePath();
+        dataInputStream =
+            FileFactory.getDataInputStream(
+                absolutePath, FileFactory.getFileType(absolutePath));
+        inStream = new InputStreamReader(dataInputStream,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+        buffReader = new BufferedReader(inStream);
+        dataMapSchemas.add(gsonObjectToRead.fromJson(buffReader, DataMapSchema.class));
+      } finally {
+        CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+      }
     }
     return dataMapSchemas;
   }
 
-  @Override public void dropSchema(String dataMapName, String tableName, String dataMapProviderName)
+  @Override public void dropSchema(String dataMapName)
       throws IOException {
-    String schemaPath = getSchemaPath(storePath, dataMapName, tableName, dataMapProviderName);
+    String schemaPath = getSchemaPath(storePath, dataMapName);
     if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
       throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
     }
-
+    DataMapSchema dataMapSchemaToRemove = null;
+    for (DataMapSchema dataMapSchema : dataMapSchemas) {
+      if (dataMapSchema.getDataMapName().equalsIgnoreCase(dataMapName)) {
+        dataMapSchemaToRemove =  dataMapSchema;
+      }
+    }
+    if (dataMapSchemaToRemove != null) {
+      dataMapSchemas.remove(dataMapSchemaToRemove);
+    }
     if (!FileFactory.deleteFile(schemaPath, FileFactory.getFileType(schemaPath))) {
       throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
+    } else {
+      touchMDTFile();
+    }
+  }
+
+  private void checkAndReloadDataMapSchemas() throws IOException {
+    if (FileFactory.isFileExist(mdtFilePath)) {
+      long lastModifiedTime = FileFactory.getCarbonFile(mdtFilePath).getLastModifiedTime();
+      if (this.lastModifiedTime != lastModifiedTime) {
+        dataMapSchemas = retrieveAllSchemasInternal();
+        this.lastModifiedTime = lastModifiedTime;
+      }
+    } else {
+      touchMDTFile();
+      dataMapSchemas = retrieveAllSchemasInternal();
+    }
+  }
+
+  private void touchMDTFile() throws IOException {
+    if (!FileFactory.isFileExist(storePath)) {
+      FileFactory.createDirectoryAndSetPermission(
+          storePath,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    }
+    if (!FileFactory.isFileExist(mdtFilePath)) {
+      FileFactory.createNewFile(
+          mdtFilePath,
+          FileFactory.getFileType(mdtFilePath),
+          true,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
     }
+    long lastModifiedTime = System.currentTimeMillis();
+    FileFactory.getCarbonFile(mdtFilePath).setLastModifiedTime(lastModifiedTime);
   }
 
   /**
    * it returns the schema path for the datamap
    * @param storePath
    * @param dataMapName
-   * @param tableName
-   * @param dataMapProviderName
    * @return
    */
-  public static String getSchemaPath(String storePath, String dataMapName, String tableName,
-      String dataMapProviderName) {
-    String schemaPath = storePath + CarbonCommonConstants.FILE_SEPARATOR + tableName
-        + CarbonCommonConstants.UNDERSCORE + dataMapName + CarbonCommonConstants.UNDERSCORE
-        + dataMapProviderName + ".dmschema";
+  public static String getSchemaPath(String storePath, String dataMapName) {
+    String schemaPath =  storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName
+        + ".dmschema";;
     return schemaPath;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStoraheProviderSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStoraheProviderSuite.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStoraheProviderSuite.java
new file mode 100644
index 0000000..709215a
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStoraheProviderSuite.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DiskBasedDMSchemaStoraheProviderSuite {
+
+  @BeforeClass public static void setUp() throws IOException {
+    String path =
+        new File(DiskBasedDMSchemaStorageProvider.class.getResource("/").getPath() + "../")
+            .getCanonicalPath().replaceAll("\\\\", "/");
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, path);
+    FileFactory.deleteAllCarbonFilesOfDir(
+        FileFactory.getCarbonFile(CarbonProperties.getInstance().getSystemFolderLocation()));
+  }
+
+  @AfterClass public static void tearDown() {
+    FileFactory.deleteAllCarbonFilesOfDir(
+        FileFactory.getCarbonFile(CarbonProperties.getInstance().getSystemFolderLocation()));
+  }
+
+  private DiskBasedDMSchemaStorageProvider provider = new DiskBasedDMSchemaStorageProvider(
+      CarbonProperties.getInstance().getSystemFolderLocation());
+
+  @Test public void testSaveSchema() throws IOException, NoSuchDataMapException {
+    DataMapSchema dataMapSchema = createDataMapSchema("dm1", "table1");
+    provider.saveSchema(dataMapSchema);
+    CarbonFile[] schemaFilesFromLocation = getSchemaFilesFromLocation();
+    assert (existsSchema(dataMapSchema, schemaFilesFromLocation));
+    DataMapSchema dataMapSchema1 = provider.retrieveSchema("dm1");
+    assert (dataMapSchema.getDataMapName().equals(dataMapSchema1.getDataMapName()));
+  }
+
+  @Test public void testDropSchema() throws IOException {
+    DataMapSchema dataMapSchema = createDataMapSchema("dm2", "table1");
+    provider.saveSchema(dataMapSchema);
+    provider.dropSchema("dm2");
+    CarbonFile[] schemaFilesFromLocation = getSchemaFilesFromLocation();
+    for (CarbonFile file : schemaFilesFromLocation) {
+      assert (!file.getName().contains("dm2"));
+    }
+    try {
+      provider.retrieveSchema("dm2");
+      assert (false);
+    } catch (NoSuchDataMapException e) {
+      // Ignore
+    }
+  }
+
+  @Test public void testRetriveAllSchemas() throws IOException {
+    DataMapSchema dataMapSchema1 = createDataMapSchema("dm3", "table1");
+    DataMapSchema dataMapSchema2 = createDataMapSchema("dm4", "table1");
+    DataMapSchema dataMapSchema3 = createDataMapSchema("dm5", "table1");
+    provider.saveSchema(dataMapSchema1);
+    provider.saveSchema(dataMapSchema2);
+    provider.saveSchema(dataMapSchema3);
+
+    List<DataMapSchema> dataMapSchemas = provider.retrieveAllSchemas();
+    assert (existsSchema(dataMapSchema1, dataMapSchemas));
+    assert (existsSchema(dataMapSchema2, dataMapSchemas));
+    assert (existsSchema(dataMapSchema3, dataMapSchemas));
+  }
+
+  @Test public void testWithOtherProvider() throws IOException, InterruptedException {
+    DataMapSchema dataMapSchema1 = createDataMapSchema("dm6", "table1");
+    DataMapSchema dataMapSchema2 = createDataMapSchema("dm7", "table1");
+    DataMapSchema dataMapSchema3 = createDataMapSchema("dm8", "table1");
+    provider.saveSchema(dataMapSchema1);
+    Thread.sleep(400);
+    provider.saveSchema(dataMapSchema2);
+    Thread.sleep(400);
+    DiskBasedDMSchemaStorageProvider provider1 = new DiskBasedDMSchemaStorageProvider(
+        CarbonProperties.getInstance().getSystemFolderLocation());
+    provider1.saveSchema(dataMapSchema3);
+    Thread.sleep(400);
+
+    List<DataMapSchema> dataMapSchemas = provider1.retrieveAllSchemas();
+    assert (existsSchema(dataMapSchema1, dataMapSchemas));
+    assert (existsSchema(dataMapSchema2, dataMapSchemas));
+    assert (existsSchema(dataMapSchema3, dataMapSchemas));
+
+    List<DataMapSchema> dataMapSchemas1 = provider.retrieveAllSchemas();
+    assert (existsSchema(dataMapSchema1, dataMapSchemas1));
+    assert (existsSchema(dataMapSchema2, dataMapSchemas1));
+    assert (existsSchema(dataMapSchema3, dataMapSchemas1));
+  }
+
+  @Test public void testDropWithOtherProvider() throws IOException, InterruptedException {
+    DataMapSchema dataMapSchema1 = createDataMapSchema("dm9", "table1");
+    DataMapSchema dataMapSchema2 = createDataMapSchema("dm10", "table1");
+    DataMapSchema dataMapSchema3 = createDataMapSchema("dm11", "table1");
+    provider.saveSchema(dataMapSchema1);
+    Thread.sleep(400);
+    provider.saveSchema(dataMapSchema2);
+    Thread.sleep(400);
+    provider.saveSchema(dataMapSchema3);
+    Thread.sleep(400);
+
+    DiskBasedDMSchemaStorageProvider provider1 = new DiskBasedDMSchemaStorageProvider(
+        CarbonProperties.getInstance().getSystemFolderLocation());
+    provider1.dropSchema(dataMapSchema3.getDataMapName());
+    Thread.sleep(400);
+
+    List<DataMapSchema> dataMapSchemas = provider1.retrieveAllSchemas();
+    assert (existsSchema(dataMapSchema1, dataMapSchemas));
+    assert (existsSchema(dataMapSchema2, dataMapSchemas));
+    assert (!existsSchema(dataMapSchema3, dataMapSchemas));
+
+    List<DataMapSchema> dataMapSchemas1 = provider.retrieveAllSchemas();
+    assert (existsSchema(dataMapSchema1, dataMapSchemas1));
+    assert (existsSchema(dataMapSchema2, dataMapSchemas1));
+    assert (!existsSchema(dataMapSchema3, dataMapSchemas1));
+  }
+
+  private boolean existsSchema(DataMapSchema schema, List<DataMapSchema> dataMapSchemas) {
+    for (DataMapSchema dataMapSchema : dataMapSchemas) {
+      if (dataMapSchema.getDataMapName().equals(schema.getDataMapName())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean existsSchema(DataMapSchema schema, CarbonFile[] carbonFiles) {
+    for (CarbonFile dataMapSchema : carbonFiles) {
+      if (dataMapSchema.getName().contains(schema.getDataMapName())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private DataMapSchema createDataMapSchema(String name, String table) {
+    DataMapSchema mapSchema = new DataMapSchema(name, "index");
+    RelationIdentifier identifier = new RelationIdentifier("default", table, "");
+
+    ArrayList<RelationIdentifier> parentTables = new ArrayList<>();
+    parentTables.add(identifier);
+    mapSchema.setParentTables(parentTables);
+    mapSchema.setRelationIdentifier(identifier);
+    return mapSchema;
+  }
+
+  private CarbonFile[] getSchemaFilesFromLocation() {
+    CarbonFile carbonFile =
+        FileFactory.getCarbonFile(CarbonProperties.getInstance().getSystemFolderLocation());
+    CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(".dmschema");
+      }
+    });
+    return carbonFiles;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index c5ea2c7..994472d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 
 class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
 
@@ -121,25 +122,93 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
 
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
 
-    //    sql("select * from normal_test where name='n34000'").show
     checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
-//    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 'n10%'"))
     checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
 
-    //    checkAnswer(
-    //      sql("select * from datamap_test where match('name:n34000')"),
-    //      sql("select * from normal_test where name='n34000'"))
+    sql("drop datamap dm on table datamap_test")
+  }
+
+  test("test lucene fine grain data map drop") {
+    sql("DROP TABLE IF EXISTS datamap_test1")
+    sql(
+      """
+        | CREATE TABLE datamap_test1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm12 ON TABLE datamap_test1
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test1 OPTIONS('header'='false')")
+
+    checkAnswer(sql("SELECT * FROM datamap_test1 WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test1 where name='n10'"))
+
+    intercept[Exception] {
+      sql("drop datamap dm12")
+    }
+    val schema = DataMapStoreManager.getInstance().getDataMapSchema("dm12")
+    sql("drop datamap dm12 on table datamap_test1")
+    intercept[Exception] {
+      val schema = DataMapStoreManager.getInstance().getDataMapSchema("dm12")
+    }
+    sql("DROP TABLE IF EXISTS datamap_test1")
+  }
+
+  test("test lucene fine grain data map show") {
+    sql("DROP TABLE IF EXISTS datamap_test2")
+    sql("DROP TABLE IF EXISTS datamap_test3")
+    sql(
+      """
+        | CREATE TABLE datamap_test2(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm122 ON TABLE datamap_test2
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(
+      """
+        | CREATE TABLE datamap_test3(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm123 ON TABLE datamap_test3
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test2 OPTIONS('header'='false')")
+
+    checkAnswer(sql("SELECT * FROM datamap_test2 WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test2 where name='n10'"))
+
+    assert(sql("show datamap on table datamap_test2").count() == 1)
+    assert(sql("show datamap").count() == 2)
+    sql("DROP TABLE IF EXISTS datamap_test2")
+    sql("DROP TABLE IF EXISTS datamap_test3")
   }
 
   override protected def afterAll(): Unit = {
     LuceneFineGrainDataMapSuite.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")
     sql("DROP TABLE IF EXISTS datamap_test")
+    sql("DROP TABLE IF EXISTS datamap_test1")
+    sql("DROP TABLE IF EXISTS datamap_test2")
+    sql("DROP TABLE IF EXISTS datamap_test3")
     sql("use default")
     sql("drop database if exists lucene cascade")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
-        CarbonProperties.getStorePath)
+          CarbonProperties.getStorePath)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 8757c76..f73a587 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -110,7 +110,7 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
       sql("DROP DATAMAP not_exists_datamap ON TABLE maintable")
     }
     assert(e.getMessage.equals(
-      "Datamap with name not_exists_datamap does not exist under table maintable"))
+      "Datamap with name not_exists_datamap does not exist"))
   }
 
   test("drop datamap without 'if exists' when main table not exists") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
index 2c984ea..33ac413 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
@@ -61,7 +61,7 @@ class TestTimeSeriesDropSuite extends QueryTest with BeforeAndAfterAll with Befo
       sql(s"DROP DATAMAP agg1_month ON TABLE mainTable")
     }
     assert(e.getMessage.equals(
-      "Datamap with name agg1_month does not exist under table mainTable"))
+      "Datamap with name agg1_month does not exist"))
   }
 
   test("test timeseries drop datamap 2: should support drop datamap IF EXISTS, maintable hasn't datamap") {
@@ -93,7 +93,7 @@ class TestTimeSeriesDropSuite extends QueryTest with BeforeAndAfterAll with Befo
       sql(s"DROP DATAMAP agg1_month ON TABLE mainTable")
     }
     assert(e.getMessage.equals(
-      "Datamap with name agg1_month does not exist under table mainTable"))
+      "Datamap with name agg1_month does not exist"))
   }
 
   test("test timeseries drop datamap 4: should support drop datamap with IF EXISTS, maintable has datamap") {
@@ -117,7 +117,7 @@ class TestTimeSeriesDropSuite extends QueryTest with BeforeAndAfterAll with Befo
       sql(s"DROP DATAMAP agg1_month ON TABLE mainTable")
     }
     assert(e.getMessage.equals(
-      "Datamap with name agg1_month does not exist under table mainTable"))
+      "Datamap with name agg1_month does not exist"))
   }
 
   test("test timeseries drop datamap 5: drop datamap without IF EXISTS when table not exists, catch MalformedCarbonCommandException") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 20e01ff..6087727 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -395,7 +395,7 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
     val dataMapProvider = classOf[CGDataMapFactory].getName
     sql(s"create datamap test_cg_datamap on table datamap_store_test using '$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap", "datamap_store_test", dataMapProvider)
+    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap")
 
     assert(FileFactory.isFileExist(loc))
   }
@@ -412,7 +412,7 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
     val dataMapProvider = classOf[CGDataMapFactory].getName
     sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using '$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap1", "datamap_store_test1", dataMapProvider)
+    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap1")
 
     assert(FileFactory.isFileExist(loc))
 
@@ -433,7 +433,7 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
     val dataMapProvider = classOf[CGDataMapFactory].getName
     sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using '$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation,"test_cg_datamap2", "datamap_store_test2", dataMapProvider)
+    val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation,"test_cg_datamap2")
 
     assert(FileFactory.isFileExist(loc))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
index 1a27abf..886200b 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
@@ -20,9 +20,6 @@ package org.apache.carbondata.datamap;
 import org.apache.carbondata.core.datamap.DataMapProvider;
 import org.apache.carbondata.core.datamap.IndexDataMapProvider;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
-import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider;
-import org.apache.carbondata.core.util.CarbonProperties;
 
 import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE;
 import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES;
@@ -53,14 +50,9 @@ public class DataMapManager {
     } else if (dataMapSchema.getProviderName().equalsIgnoreCase(TIMESERIES.toString())) {
       provider = new TimeseriesDataMapProvider(sparkSession);
     } else {
-      provider = new IndexDataMapProvider(getDataMapSchemaStorageProvider());
+      provider = new IndexDataMapProvider();
     }
     return provider;
   }
 
-  private DataMapSchemaStorageProvider getDataMapSchemaStorageProvider() {
-    return new DiskBasedDMSchemaStorageProvider(
-        CarbonProperties.getInstance().getSystemFolderLocation());
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index f64299c..60366c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager, IndexDataMapProvider}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -105,7 +105,10 @@ case class CarbonCreateDataMapCommand(
       dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
     dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema, sparkSession)
     dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull)
-    DataMapStatusManager.disableDataMap(dataMapName)
+    // TODO Currently this feature is only available for index datamaps
+    if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) {
+      DataMapStatusManager.disableDataMap(dataMapName)
+    }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added")
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 613c8b2..844d4c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -54,7 +54,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
         if (carbonTable.hasDataMapSchema) {
           dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList)
         }
-        val indexSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
+        val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
         if (!indexSchemas.isEmpty) {
           dataMapSchemaList.addAll(indexSchemas)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 98a13a6..0235666 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -78,7 +78,7 @@ case class CarbonDropDataMapCommand(
         }
       }
       if (forceDrop && mainTable != null && dataMapSchema != null) {
-        dropDataMapFromSystemFolder(sparkSession, tableName)
+        dropDataMapFromSystemFolder(sparkSession)
         return Seq.empty
       }
       val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
@@ -128,7 +128,7 @@ case class CarbonDropDataMapCommand(
           }
         } else if (mainTable != null &&
                    mainTable.getTableInfo.getDataMapSchemaList.size() == 0) {
-          dropDataMapFromSystemFolder(sparkSession, tableName)
+          dropDataMapFromSystemFolder(sparkSession)
         }
       } catch {
         case e: NoSuchDataMapException =>
@@ -153,27 +153,21 @@ case class CarbonDropDataMapCommand(
       Seq.empty
   }
 
-  private def dropDataMapFromSystemFolder(sparkSession: SparkSession, tableName: String = null) = {
-    if (dataMapSchema == null) {
-      val schema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala.find { dm =>
-        dm.getDataMapName.equalsIgnoreCase(dataMapName)
+  private def dropDataMapFromSystemFolder(sparkSession: SparkSession) = {
+    try {
+      if (dataMapSchema == null) {
+        dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
       }
-      dataMapSchema = schema match {
-        case Some(dmSchema) => dmSchema
-        case _ => null
-      }
-    }
-    if (dataMapSchema != null) {
-      // TODO do a check for existance before dropping
-      dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession)
-      DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
-      dataMapProvider.freeMeta(mainTable, dataMapSchema)
-    } else if (!ifExistsSet) {
-      if (tableName != null) {
-        throw new NoSuchDataMapException(dataMapName, tableName)
-      } else {
-        throw new NoSuchDataMapException(dataMapName)
+      if (dataMapSchema != null) {
+        dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession)
+        DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
+        dataMapProvider.freeMeta(mainTable, dataMapSchema)
       }
+    } catch {
+      case e: Exception =>
+        if (!ifExistsSet) {
+          throw e
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 07cdf7c..53e1ed4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -111,7 +111,8 @@ case class CarbonDropTableCommand(
           }
         childDropCommands.foreach(_.processMetadata(sparkSession))
       }
-      val indexDatamapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
+      val indexDatamapSchemas =
+        DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
       if (!indexDatamapSchemas.isEmpty) {
         childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
           val command = CarbonDropDataMapCommand(schema.getDataMapName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f2a748f/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 14950eb..30a620c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -52,7 +52,13 @@ public class DataMapWriterListener {
    */
   public void registerAllWriter(CarbonTable carbonTable, String segmentId,
       String dataWritePath) {
-    List<TableDataMap> tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
+    List<TableDataMap> tableIndices;
+    try {
+      tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
+    } catch (IOException e) {
+      LOG.error(e, "Error while retrieving datamaps");
+      throw new RuntimeException(e);
+    }
     if (tableIndices != null) {
       for (TableDataMap tableDataMap : tableIndices) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();