You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 11:14:14 UTC

[2/2] carbondata git commit: [CARBONDATA-2278] Save the datamaps to system folder of warehouse.

[CARBONDATA-2278] Save the datamaps to system folder of warehouse.

Make the datamap schema independent of main table schema. And store the schema under _system folder location. This location is configurable by using carbon property carbon.system.folder.location , by default, it stores under the store location.
Created datamap schema in JSON format for better readability. And has the interfaces to store it in database but not given any implementation to it in this PR.
Made on table <tablename> for datamap DDL as optional , so now user can create/drop or show datamaps without on table option.

This closes #2100


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/05086e53
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/05086e53
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/05086e53

Branch: refs/heads/master
Commit: 05086e5367b3ba9888825b5b1388059ef987fc26
Parents: c723947
Author: ravipesala <ra...@gmail.com>
Authored: Sun Mar 25 07:51:21 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 28 19:13:18 2018 +0800

----------------------------------------------------------------------
 .../exceptions/sql/NoSuchDataMapException.java  |   4 +
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../carbondata/core/datamap/DataMapCatalog.java |  51 +++++
 .../core/datamap/DataMapProvider.java           | 108 ++++++++++
 .../core/datamap/DataMapStoreManager.java       |  96 ++++++++-
 .../core/datamap/IndexDataMapProvider.java      | 126 ++++++++++++
 .../schema/datamap/DataMapClassProvider.java    |  71 +++++++
 .../schema/datamap/DataMapProvider.java         |  71 -------
 .../metadata/schema/table/DataMapSchema.java    |  91 ++++++---
 .../schema/table/DataMapSchemaFactory.java      |   6 +-
 .../table/DataMapSchemaStorageProvider.java     |  68 +++++++
 .../table/DiskBasedDMSchemaStorageProvider.java | 143 +++++++++++++
 .../carbondata/core/util/CarbonProperties.java  |  13 ++
 .../TimeSeriesPreAggregateTestCase.scala        |   2 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |   2 +-
 .../TestPreAggregateTableSelection.scala        |   2 +-
 .../timeseries/TestTimeSeriesCreateTable.scala  |   2 +-
 .../timeseries/TestTimeseriesCompaction.scala   |   2 +-
 .../timeseries/TestTimeseriesDataLoad.scala     |   2 +-
 .../TestTimeseriesTableSelection.scala          |   2 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  64 +++++-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   4 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  14 +-
 .../carbondata/datamap/DataMapManager.java      |  27 ++-
 .../carbondata/datamap/DataMapProvider.java     | 105 ----------
 .../datamap/IndexDataMapProvider.java           | 118 -----------
 .../datamap/PreAggregateDataMapProvider.java    |  29 ++-
 .../datamap/TimeseriesDataMapProvider.java      |   8 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  29 +--
 .../datamap/CarbonDataMapShowCommand.scala      |  33 ++-
 .../datamap/CarbonDropDataMapCommand.scala      | 200 +++++++++++--------
 .../command/table/CarbonDropTableCommand.scala  |  16 ++
 .../command/timeseries/TimeSeriesUtil.scala     |   2 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  27 ++-
 34 files changed, 1076 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
index 7ab9048..2e6e2e3 100644
--- a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
@@ -36,4 +36,8 @@ public class NoSuchDataMapException extends MalformedCarbonCommandException {
   public NoSuchDataMapException(String dataMapName, String tableName) {
     super("Datamap with name " + dataMapName + " does not exist under table " + tableName);
   }
+
+  public NoSuchDataMapException(String dataMapName) {
+    super("Datamap with name " + dataMapName + " does not exist");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 45d15fe..ff7a4bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1618,6 +1618,11 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT = "200";
 
+  /**
+   * System older location to store system level data like datamap schema and status files.
+   */
+  public static final String CARBON_SYSTEM_FOLDER_LOCATION = "carbon.system.folder.location";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
new file mode 100644
index 0000000..89f2838
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+/**
+ * This is the interface for inmemory catalog registry for datamap.
+ * @since 1.4.0
+ */
+public interface DataMapCatalog<T> {
+
+  /**
+   * Register schema to the catalog.
+   * @param dataMapSchema
+   */
+  void registerSchema(DataMapSchema dataMapSchema);
+
+  /**
+   * Unregister schema from catalog.
+   * @param dataMapName
+   */
+  void unregisterSchema(String dataMapName);
+
+  /**
+   * List all registered schema catalogs
+   * @return
+   */
+  T[] listAllSchema();
+
+  /**
+   * It reloads/removes all registered schema catalogs
+   */
+  void refresh();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
new file mode 100644
index 0000000..2236503
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+/**
+ * DataMap is a accelerator for certain type of query. Developer can add new DataMap
+ * implementation to improve query performance.
+ *
+ * Currently two types of DataMap are supported
+ * <ol>
+ *   <li> MVDataMap: materialized view type of DataMap to accelerate olap style query,
+ * like SPJG query (select, predicate, join, groupby) </li>
+ *   <li> DataMap: index type of DataMap to accelerate filter query </li>
+ * </ol>
+ *
+ * <p>
+ * In following command <br>
+ * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br>
+ * the <b>provider</b> string can be a short name or class name of the DataMap implementation.
+ *
+ * <br>Currently CarbonData supports following provider:
+ * <ol>
+ *   <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
+ *   <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
+ *     of the table </li>
+ *   <li> class name of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory}
+ * implementation: Developer can implement new type of DataMap by extending
+ * {@link org.apache.carbondata.core.datamap.dev.DataMapFactory} </li>
+ * </ol>
+ *
+ * @since 1.4.0
+ */
+@InterfaceAudience.Internal
+public interface DataMapProvider {
+
+  /**
+   * Initialize a datamap's metadata.
+   * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
+   * Implementation should initialize metadata for datamap, like creating table
+   */
+  void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
+      throws MalformedDataMapCommandException, IOException;
+
+  /**
+   * Initialize a datamap's data.
+   * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
+   * Implementation should initialize data for datamap, like creating data folders
+   */
+  void initData(CarbonTable mainTable);
+
+  /**
+   * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String)}.
+   * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
+   * Implementation should clean all meta for the datamap
+   */
+  void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException;
+
+  /**
+   * Opposite operation of {@link #initData(CarbonTable)}.
+   * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
+   * Implementation should clean all data for the datamap
+   */
+  void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema);
+
+  /**
+   * Rebuild the datamap by loading all existing data from mainTable
+   * This is called when refreshing the datamap when
+   * 1. after datamap creation and if `autoRefreshDataMap` is set to true
+   * 2. user manually trigger refresh datamap command
+   */
+  void rebuild(CarbonTable mainTable) throws IOException;
+
+  /**
+   * Build the datamap incrementally by loading specified segment data
+   * This is called when user manually trigger refresh datamap
+   */
+  void incrementalBuild(CarbonTable mainTable, String[] segmentIds)
+    throws IOException;
+
+  /**
+   * Provide the datamap catalog instance or null if this datamap not required to rewrite
+   * the query.
+   */
+  DataMapCatalog createDataMapCatalog();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a8d467f..d01df4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -35,9 +35,13 @@ import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactor
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
+import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * It maintains all the DataMaps in it.
@@ -52,6 +56,11 @@ public final class DataMapStoreManager {
    */
   private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
 
+  /**
+   * Contains the datamap catalog for each datamap provider.
+   */
+  private Map<String, DataMapCatalog> dataMapCatalogs = new ConcurrentHashMap<>();
+
   private Map<String, TableSegmentRefresher> segmentRefreshMap = new ConcurrentHashMap<>();
 
   private static final LogService LOGGER =
@@ -84,11 +93,15 @@ public final class DataMapStoreManager {
    * @return
    */
   public List<TableDataMap> getAllDataMap(CarbonTable carbonTable) {
-    List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+    // TODO cache all schemas and update only when datamap status file updates
+    List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
     List<TableDataMap> dataMaps = new ArrayList<>();
-    if (dataMapSchemaList != null) {
-      for (DataMapSchema dataMapSchema : dataMapSchemaList) {
-        if (dataMapSchema.isIndexDataMap()) {
+    if (dataMapSchemas != null) {
+      for (DataMapSchema dataMapSchema : dataMapSchemas) {
+        RelationIdentifier identifier = dataMapSchema.getParentTables().get(0);
+        if (dataMapSchema.isIndexDataMap() && identifier.getTableName()
+            .equals(carbonTable.getTableName()) && identifier.getDatabaseName()
+            .equals(carbonTable.getDatabaseName())) {
           dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema));
         }
       }
@@ -97,6 +110,81 @@ public final class DataMapStoreManager {
   }
 
   /**
+   * It gives all datamap schemas.
+   *
+   * @return
+   */
+  public List<DataMapSchema> getAllDataMapSchemas(CarbonTable carbonTable) {
+    // TODO cache all schemas and update only when datamap status file updates
+    List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
+    List<DataMapSchema> dataMaps = new ArrayList<>();
+    if (dataMapSchemas != null) {
+      for (DataMapSchema dataMapSchema : dataMapSchemas) {
+        RelationIdentifier identifier = dataMapSchema.getParentTables().get(0);
+        if (dataMapSchema.isIndexDataMap() && identifier.getTableName()
+            .equals(carbonTable.getTableName()) && identifier.getDatabaseName()
+            .equals(carbonTable.getDatabaseName())) {
+          dataMaps.add(dataMapSchema);
+        }
+      }
+    }
+    return dataMaps;
+  }
+
+  public List<DataMapSchema> getAllDataMapSchemas() {
+    DataMapSchemaStorageProvider provider = new DiskBasedDMSchemaStorageProvider(
+        CarbonProperties.getInstance().getSystemFolderLocation());
+    List<DataMapSchema> dataMapSchemas;
+    try {
+      dataMapSchemas = provider.retrieveAllSchemas();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return dataMapSchemas;
+  }
+
+  /**
+   * Register datamap catalog for the datamap provider
+   * @param dataMapProvider
+   * @param dataMapSchema
+   */
+  public synchronized void registerDataMapCatalog(DataMapProvider dataMapProvider,
+      DataMapSchema dataMapSchema) {
+    String name = dataMapSchema.getProviderName();
+    DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
+    if (dataMapCatalog == null) {
+      dataMapCatalog = dataMapProvider.createDataMapCatalog();
+      if (dataMapCatalog != null) {
+        dataMapCatalogs.put(name, dataMapCatalog);
+        dataMapCatalog.registerSchema(dataMapSchema);
+      }
+    } else {
+      dataMapCatalog.registerSchema(dataMapSchema);
+    }
+  }
+
+  /**
+   * Unregister datamap catalog.
+   * @param dataMapSchema
+   */
+  public synchronized void unRegisterDataMapCatalog(DataMapSchema dataMapSchema) {
+    String name = dataMapSchema.getProviderName();
+    DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
+    if (dataMapCatalog != null) {
+      dataMapCatalog.unregisterSchema(dataMapSchema.getDataMapName());
+    }
+  }
+
+  /**
+   * Get the datamap catalog for provider.
+   * @param providerName
+   * @return
+   */
+  public DataMapCatalog getDataMapCatalog(String providerName) {
+    return dataMapCatalogs.get(providerName);
+  }
+
+  /**
    * It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
    *
    * @param identifier

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
new file mode 100644
index 0000000..e188bf1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+
+@InterfaceAudience.Internal
+public class IndexDataMapProvider implements DataMapProvider {
+
+  private DataMapSchemaStorageProvider storageProvider;
+
+  public IndexDataMapProvider(DataMapSchemaStorageProvider storageProvider) {
+    this.storageProvider = storageProvider;
+  }
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
+      throws MalformedDataMapCommandException, IOException {
+    ArrayList<RelationIdentifier> relationIdentifiers = new ArrayList<>();
+    dataMapSchema.setParentTables(relationIdentifiers);
+    relationIdentifiers.add(
+        new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(),
+            mainTable.getTableInfo().getFactTable().getTableId()));
+    DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
+    DataMapStoreManager.getInstance().registerDataMap(
+        mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
+    storageProvider.saveSchema(dataMapSchema);
+  }
+
+  @Override
+  public void initData(CarbonTable mainTable) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException {
+    storageProvider.dropSchema(dataMapSchema.getDataMapName());
+  }
+
+  @Override
+  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema) {
+    DataMapStoreManager.getInstance().clearDataMap(
+        mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
+  }
+
+  @Override
+  public void rebuild(CarbonTable mainTable) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds) {
+    throw new UnsupportedOperationException();
+  }
+
+  private DataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    DataMapFactory dataMapFactory;
+    try {
+      // try to create DataMapClassProvider instance by taking providerName as class name
+      Class<? extends DataMapFactory> providerClass =
+          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
+      dataMapFactory = providerClass.newInstance();
+    } catch (ClassNotFoundException e) {
+      // try to create DataMapClassProvider instance by taking providerName as short name
+      dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getProviderName());
+    } catch (Throwable e) {
+      throw new MetadataProcessException(
+          "failed to create DataMapClassProvider '" + dataMapSchema.getProviderName() + "'", e);
+    }
+    return dataMapFactory;
+  }
+
+  private DataMapFactory getDataMapFactoryByShortName(String providerName)
+      throws MalformedDataMapCommandException {
+    DataMapFactory dataMapFactory;
+    String className = DataMapRegistry.getDataMapClassName(providerName);
+    if (className != null) {
+      try {
+        Class<? extends DataMapFactory> datamapClass =
+            (Class<? extends DataMapFactory>) Class.forName(providerName);
+        dataMapFactory = datamapClass.newInstance();
+      } catch (ClassNotFoundException ex) {
+        throw new MalformedDataMapCommandException(
+            "DataMap '" + providerName + "' not found", ex);
+      } catch (Throwable ex) {
+        throw new MetadataProcessException(
+            "failed to create DataMap '" + providerName + "'", ex);
+      }
+    } else {
+      throw new MalformedDataMapCommandException(
+          "DataMap '" + providerName + "' not found");
+    }
+    return dataMapFactory;
+  }
+
+  @Override public DataMapCatalog createDataMapCatalog() {
+    // TODO create abstract class and move the default implementation there.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
new file mode 100644
index 0000000..3934444
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.datamap;
+
+/**
+ * type for create datamap
+ * The syntax of datamap creation is as follows.
+ * CREATE DATAMAP IF NOT EXISTS dataMapName ON TABLE tableName USING 'DataMapClassProvider'
+ * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
+ *
+ * Please refer {{org.apache.spark.sql.parser.CarbonSpark2SqlParser}}
+ */
+
+public enum DataMapClassProvider {
+  PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
+  TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries");
+
+  /**
+   * Fully qualified class name of datamap
+   */
+  private String className;
+
+  /**
+   * Short name representation of datamap
+   */
+  private String shortName;
+
+  DataMapClassProvider(String className, String shortName) {
+    this.className = className;
+    this.shortName = shortName;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public String getShortName() {
+    return shortName;
+  }
+
+  private boolean isEqual(String dataMapClass) {
+    return (dataMapClass != null &&
+        (dataMapClass.equals(className) ||
+        dataMapClass.equalsIgnoreCase(shortName)));
+  }
+
+  public static DataMapClassProvider getDataMapProvider(String dataMapClass) {
+    if (TIMESERIES.isEqual(dataMapClass)) {
+      return TIMESERIES;
+    } else if (PREAGGREGATE.isEqual(dataMapClass)) {
+      return PREAGGREGATE;
+    } else {
+      throw new UnsupportedOperationException("Unknown datamap provider/class " + dataMapClass);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
deleted file mode 100644
index 39304d8..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.metadata.schema.datamap;
-
-/**
- * type for create datamap
- * The syntax of datamap creation is as follows.
- * CREATE DATAMAP IF NOT EXISTS dataMapName ON TABLE tableName USING 'DataMapProvider'
- * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
- *
- * Please refer {{org.apache.spark.sql.parser.CarbonSpark2SqlParser}}
- */
-
-public enum DataMapProvider {
-  PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
-  TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries");
-
-  /**
-   * Fully qualified class name of datamap
-   */
-  private String className;
-
-  /**
-   * Short name representation of datamap
-   */
-  private String shortName;
-
-  DataMapProvider(String className, String shortName) {
-    this.className = className;
-    this.shortName = shortName;
-  }
-
-  public String getClassName() {
-    return className;
-  }
-
-  public String getShortName() {
-    return shortName;
-  }
-
-  private boolean isEqual(String dataMapClass) {
-    return (dataMapClass != null &&
-        (dataMapClass.equals(className) ||
-        dataMapClass.equalsIgnoreCase(shortName)));
-  }
-
-  public static DataMapProvider getDataMapProvider(String dataMapClass) {
-    if (TIMESERIES.isEqual(dataMapClass)) {
-      return TIMESERIES;
-    } else if (PREAGGREGATE.isEqual(dataMapClass)) {
-      return PREAGGREGATE;
-    } else {
-      throw new UnsupportedOperationException("Unknown datamap provider/class " + dataMapClass);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 877fab7..6b592fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.core.metadata.schema.table;
 
 import java.io.DataInput;
@@ -21,35 +22,38 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+
+import com.google.gson.Gson;
 
 /**
- * Child schema class to maintain the child table details inside parent table
+ * It is the new schama of datamap and it has less fields compare to {{@link DataMapSchema}}
  */
 public class DataMapSchema implements Serializable, Writable {
 
-  private static final long serialVersionUID = 6577149126264181553L;
+  private static final long serialVersionUID = -8394577999061329687L;
 
   protected String dataMapName;
 
   /**
    * There are two kind of DataMaps:
    * 1. Index DataMap: provider name is class name of implementation class of DataMapFactory
-   * 2. OLAP DataMap: provider name is one of the {@link DataMapProvider#shortName}
+   * 2. OLAP DataMap: provider name is one of the {@link DataMapClassProvider#shortName}
    */
-  private String providerName;
+  protected String providerName;
 
   /**
-   * identifier of the parent table
+   * identifiers of the mapped table
    */
-  private RelationIdentifier relationIdentifier;
+  protected RelationIdentifier relationIdentifier;
 
   /**
-   * child table schema
+   * Query which is used to create a datamap. This is optional in case of index datamap.
    */
-  protected TableSchema childSchema;
+  protected String ctasQuery;
 
   /**
    * relation properties
@@ -57,55 +61,93 @@ public class DataMapSchema implements Serializable, Writable {
   protected Map<String, String> properties;
 
   /**
-   * WARN: This constructor should be used by deserialization only
+   * Identifiers of parent tables
    */
-  public DataMapSchema() {
-  }
+  protected List<RelationIdentifier> parentTables;
+
+  /**
+   * child table schema
+   */
+  protected TableSchema childSchema;
+
 
   public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
     this.providerName = providerName;
   }
 
+  public DataMapSchema() {
+  }
+
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
+  public void setDataMapName(String dataMapName) {
+    this.dataMapName = dataMapName;
+  }
+
   public String getProviderName() {
     return providerName;
   }
 
-  public TableSchema getChildSchema() {
-    return childSchema;
+  public void setProviderName(String providerName) {
+    this.providerName = providerName;
   }
 
   public RelationIdentifier getRelationIdentifier() {
     return relationIdentifier;
   }
 
-  public Map<String, String> getProperties() {
-    return properties;
+  public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
+    this.relationIdentifier = relationIdentifier;
   }
 
-  public String getDataMapName() {
-    return dataMapName;
+  public String getCtasQuery() {
+    return ctasQuery;
   }
 
-  public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
-    this.relationIdentifier = relationIdentifier;
+  public void setCtasQuery(String ctasQuery) {
+    this.ctasQuery = ctasQuery;
   }
 
-  public void setChildSchema(TableSchema childSchema) {
-    this.childSchema = childSchema;
+  public Map<String, String> getProperties() {
+    return properties;
   }
 
   public void setProperties(Map<String, String> properties) {
     this.properties = properties;
   }
 
+  public void setPropertiesJson(Gson gson, String propertiesJson) {
+    if (propertiesJson != null) {
+      this.properties = gson.fromJson(propertiesJson, Map.class);
+    }
+  }
+
+  public void setParentTables(List<RelationIdentifier> parentTables) {
+    this.parentTables = parentTables;
+  }
+
+  public List<RelationIdentifier> getParentTables() {
+    return parentTables;
+  }
+
+  public TableSchema getChildSchema() {
+    return childSchema;
+  }
+
+  public void setChildSchema(TableSchema childSchema) {
+    this.childSchema = childSchema;
+  }
+
   /**
    * Return true if this datamap is an Index DataMap
    * @return
    */
   public boolean isIndexDataMap() {
-    if (providerName.equalsIgnoreCase(DataMapProvider.PREAGGREGATE.getShortName()) ||
-        providerName.equalsIgnoreCase(DataMapProvider.TIMESERIES.getShortName())) {
+    if (providerName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.getShortName()) ||
+        providerName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName())) {
       return false;
     } else {
       return true;
@@ -158,4 +200,5 @@ public class DataMapSchema implements Serializable, Writable {
       this.properties.put(key, value);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
index 1c6ebad..e8022cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.core.metadata.schema.table;
 
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
 
 public class DataMapSchemaFactory {
   public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory();
@@ -28,9 +28,9 @@ public class DataMapSchemaFactory {
    * @return data map schema
    */
   public DataMapSchema getDataMapSchema(String dataMapName, String providerName) {
-    if (providerName.equalsIgnoreCase(DataMapProvider.PREAGGREGATE.toString())) {
+    if (providerName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString())) {
       return new AggregationDataMapSchema(dataMapName, providerName);
-    } else if (providerName.equalsIgnoreCase(DataMapProvider.TIMESERIES.toString())) {
+    } else if (providerName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString())) {
       return new AggregationDataMapSchema(dataMapName, providerName);
     } else {
       return new DataMapSchema(dataMapName, providerName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
new file mode 100644
index 0000000..6b9bca5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+/**
+ *It is used to save/retreive/drop datamap schema from storage medium like disk or DB.
+ * Here dataMapName must be unique across whole store.
+ *
+ * @since 1.4.0
+ */
+@InterfaceAudience.Internal
+public interface DataMapSchemaStorageProvider {
+
+  /**
+   * Save the schema to storage medium.
+   * @param dataMapSchema
+   */
+  void saveSchema(DataMapSchema dataMapSchema) throws IOException;
+
+  /**
+   * Retrieve the schema by using dataMapName.
+   * @param dataMapName
+   */
+  DataMapSchema retrieveSchema(String dataMapName) throws IOException;
+
+  /**
+   * Retrieve schemas by using the list of datamap names
+   * @param dataMapNames
+   * @return
+   * @throws IOException
+   */
+  List<DataMapSchema> retrieveSchemas(List<String> dataMapNames) throws IOException;
+
+  /**
+   * Retrieve all schemas
+   * @return
+   * @throws IOException
+   */
+  List<DataMapSchema> retrieveAllSchemas() throws IOException;
+
+  /**
+   * Drop the schema from the storage by using dataMapName.
+   * @param dataMapName
+   */
+  void dropSchema(String dataMapName) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
new file mode 100644
index 0000000..d49a9ae
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.gson.Gson;
+
+/**
+ * Stores datamap schema in disk as json format
+ */
+public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
+
+  private String storePath;
+
+  public DiskBasedDMSchemaStorageProvider(String storePath) {
+    this.storePath = storePath;
+  }
+
+  @Override public void saveSchema(DataMapSchema dataMapSchema) throws IOException {
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    String schemaPath =
+        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema.getDataMapName()
+            + ".dmschema";
+    FileFactory.FileType fileType = FileFactory.getFileType(schemaPath);
+    if (FileFactory.isFileExist(schemaPath, fileType)) {
+      throw new IOException(
+          "DataMap with name " + dataMapSchema.getDataMapName() + " already exists in storage");
+    }
+    // write the datamap shema in json format.
+    try {
+      FileFactory.mkdirs(storePath, fileType);
+      FileFactory.createNewFile(schemaPath, fileType);
+      dataOutputStream =
+          FileFactory.getDataOutputStream(schemaPath, fileType);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(dataMapSchema);
+      brWriter.write(metadataInstance);
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      CarbonUtil.closeStreams(dataOutputStream, brWriter);
+    }
+  }
+
+  @Override public DataMapSchema retrieveSchema(String dataMapName) throws IOException {
+    if (!dataMapName.endsWith(".dmschema")) {
+      dataMapName = dataMapName + ".dmschema";
+    }
+    String schemaPath =
+        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
+    if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+      throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
+    }
+
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    try {
+      dataInputStream =
+          FileFactory.getDataInputStream(schemaPath, FileFactory.getFileType(schemaPath));
+      inStream = new InputStreamReader(dataInputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      return gsonObjectToRead.fromJson(buffReader, DataMapSchema.class);
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+  }
+
+  @Override public List<DataMapSchema> retrieveSchemas(List<String> dataMapNames)
+      throws IOException {
+    List<DataMapSchema> dataMapSchemas = new ArrayList<>(dataMapNames.size());
+    for (String dataMapName : dataMapNames) {
+      dataMapSchemas.add(retrieveSchema(dataMapName));
+    }
+    return dataMapSchemas;
+  }
+
+  @Override public List<DataMapSchema> retrieveAllSchemas() throws IOException {
+    List<DataMapSchema> dataMapSchemas = new ArrayList<>();
+    CarbonFile carbonFile = FileFactory.getCarbonFile(storePath);
+    CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(".dmschema");
+      }
+    });
+
+    for (CarbonFile file :carbonFiles) {
+      dataMapSchemas.add(retrieveSchema(file.getName()));
+    }
+    return dataMapSchemas;
+  }
+
+  @Override public void dropSchema(String dataMapName) throws IOException {
+    String schemaPath =
+        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName + ".dmschema";
+    if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+      throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
+    }
+
+    if (!FileFactory.deleteFile(schemaPath, FileFactory.getFileType(schemaPath))) {
+      throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 7ed2b0f..6fa21bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1447,4 +1447,17 @@ public final class CarbonProperties {
     }
     return preserveCnt;
   }
+  /**
+   * Get the configured system folder location.
+   * @return
+   */
+  public String getSystemFolderLocation() {
+    String systemLocation = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION);
+    if (systemLocation == null) {
+      systemLocation = getStorePath();
+    }
+    return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala
index 78ea7d4..a1a033d 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala
@@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.Matchers._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TimeSeriesPreAggregateTestCase extends QueryTest with BeforeAndAfterEach {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index cd87913..8e499ba 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 6f78285..f1a6092 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 
 class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index efe34c6..e8e8f79 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 
 class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
index d66c402..2642b03 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
@@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.Matchers._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestTimeseriesCompaction extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index c5f07a0..1d1fd94 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index 3f140df..8cb6a81 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 
 class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b19c80d..b56bd6e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.Event
 import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
@@ -375,9 +375,71 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
       sql("select * from normal_test where name='n502670' and city='c2670'"))
   }
 
+  test("test datamap storage in system folder") {
+    sql("DROP TABLE IF EXISTS datamap_store_test")
+    sql(
+      """
+        | CREATE TABLE datamap_store_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+    sql(s"create datamap test_cg_datamap on table datamap_store_test using '${classOf[CGDataMapFactory].getName}' as select  id, name from datamap_store_test")
+
+    val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap.dmschema"
+
+    assert(FileFactory.isFileExist(loc))
+  }
+
+  test("test datamap storage and drop in system folder") {
+    sql("DROP TABLE IF EXISTS datamap_store_test1")
+    sql(
+      """
+        | CREATE TABLE datamap_store_test1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+    sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using '${classOf[CGDataMapFactory].getName}' as select  id, name from datamap_store_test")
+
+    val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap1.dmschema"
+
+    assert(FileFactory.isFileExist(loc))
+
+    sql(s"drop datamap test_cg_datamap1 on table datamap_store_test1")
+
+    assert(!FileFactory.isFileExist(loc))
+  }
+
+  test("test show datamap storage") {
+    sql("DROP TABLE IF EXISTS datamap_store_test2")
+    sql(
+      """
+        | CREATE TABLE datamap_store_test2(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+    sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using '${classOf[CGDataMapFactory].getName}' as select  id, name from datamap_store_test")
+
+    val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap2.dmschema"
+
+    assert(FileFactory.isFileExist(loc))
+
+    checkExistence(sql("show datamap"), true, "test_cg_datamap2")
+
+    sql(s"drop datamap test_cg_datamap2 on table datamap_store_test2")
+
+    assert(!FileFactory.isFileExist(loc))
+  }
+
   override protected def afterAll(): Unit = {
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
     sql("DROP TABLE IF EXISTS datamap_test_cg")
+    sql("DROP TABLE IF EXISTS datamap_store_test")
+    sql("DROP TABLE IF EXISTS datamap_store_test1")
+    sql("DROP TABLE IF EXISTS datamap_store_test2")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 73e9fd9..bb8d7f8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -93,7 +93,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
   test("test write datamap 2 pages") {
     sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
     // register datamap writer
-    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
+    sql(s"CREATE DATAMAP test1 ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
     val df = buildTestData(33000)
 
     // save dataframe to carbon file
@@ -119,7 +119,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
 
   test("test write datamap 2 blocklet") {
     sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
-    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
+    sql(s"CREATE DATAMAP test2 ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
 
     CarbonProperties.getInstance()
       .addProperty("carbon.blockletgroup.size.in.mb", "1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 5f55ef3..394ba5f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogService
@@ -40,7 +41,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.metadata.ColumnIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchemaStorageProvider}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
@@ -583,4 +584,15 @@ object CarbonScalaUtil {
         // ignore it
     }
   }
+
+  /**
+   * Create datamap provider using class name
+   */
+  def createDataMapProvider(className: String, sparkSession: SparkSession,
+      storageProvider: DataMapSchemaStorageProvider): Object = {
+    CarbonReflectionUtils.createObject(
+      className,
+      sparkSession,
+      storageProvider)._1.asInstanceOf[Object]
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
index b23d676..1a27abf 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
@@ -17,10 +17,17 @@
 
 package org.apache.carbondata.datamap;
 
+import org.apache.carbondata.core.datamap.DataMapProvider;
+import org.apache.carbondata.core.datamap.IndexDataMapProvider;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
+import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider;
+import org.apache.carbondata.core.util.CarbonProperties;
 
-import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.PREAGGREGATE;
-import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES;
+import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE;
+import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES;
+
+import org.apache.spark.sql.SparkSession;
 
 public class DataMapManager {
 
@@ -36,18 +43,24 @@ public class DataMapManager {
   }
 
   /**
-   * Return a DataMapProvider instance for specified dataMapSchema.
+   * Return a DataMapClassProvider instance for specified dataMapSchema.
    */
-  public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) {
+  public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
     DataMapProvider provider;
     if (dataMapSchema.getProviderName().equalsIgnoreCase(PREAGGREGATE.toString())) {
-      provider = new PreAggregateDataMapProvider();
+      provider = new PreAggregateDataMapProvider(sparkSession);
     } else if (dataMapSchema.getProviderName().equalsIgnoreCase(TIMESERIES.toString())) {
-      provider = new TimeseriesDataMapProvider();
+      provider = new TimeseriesDataMapProvider(sparkSession);
     } else {
-      provider = new IndexDataMapProvider();
+      provider = new IndexDataMapProvider(getDataMapSchemaStorageProvider());
     }
     return provider;
   }
 
+  private DataMapSchemaStorageProvider getDataMapSchemaStorageProvider() {
+    return new DiskBasedDMSchemaStorageProvider(
+        CarbonProperties.getInstance().getSystemFolderLocation());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
deleted file mode 100644
index ea571d7..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.processing.exception.DataLoadingException;
-
-import org.apache.spark.sql.SparkSession;
-
-/**
- * DataMap is a accelerator for certain type of query. Developer can add new DataMap
- * implementation to improve query performance.
- *
- * Currently two types of DataMap are supported
- * <ol>
- *   <li> MVDataMap: materialized view type of DataMap to accelerate olap style query,
- * like SPJG query (select, predicate, join, groupby) </li>
- *   <li> DataMap: index type of DataMap to accelerate filter query </li>
- * </ol>
- *
- * <p>
- * In following command <br>
- * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br>
- * the <b>provider</b> string can be a short name or class name of the DataMap implementation.
- *
- * <br>Currently CarbonData supports following provider:
- * <ol>
- *   <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
- *   <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
- *     of the table </li>
- *   <li> class name of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory}
- * implementation: Developer can implement new type of DataMap by extending
- * {@link org.apache.carbondata.core.datamap.dev.DataMapFactory} </li>
- * </ol>
- *
- * @since 1.4.0
- */
-@InterfaceAudience.Internal
-public interface DataMapProvider {
-
-  /**
-   * Initialize a datamap's metadata.
-   * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
-   * Implementation should initialize metadata for datamap, like creating table
-   */
-  void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
-      SparkSession sparkSession) throws MalformedDataMapCommandException, IOException;
-
-  /**
-   * Initialize a datamap's data.
-   * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
-   * Implementation should initialize data for datamap, like creating data folders
-   */
-  void initData(CarbonTable mainTable, SparkSession sparkSession);
-
-  /**
-   * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String, SparkSession)}.
-   * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
-   * Implementation should clean all meta for the datamap
-   */
-  void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession);
-
-  /**
-   * Opposite operation of {@link #initData(CarbonTable, SparkSession)}.
-   * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
-   * Implementation should clean all data for the datamap
-   */
-  void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession);
-
-  /**
-   * Rebuild the datamap by loading all existing data from mainTable
-   * This is called when refreshing the datamap when
-   * 1. after datamap creation and if `autoRefreshDataMap` is set to true
-   * 2. user manually trigger refresh datamap command
-   */
-  void rebuild(CarbonTable mainTable, SparkSession sparkSession) throws DataLoadingException;
-
-  /**
-   * Build the datamap incrementally by loading specified segment data
-   * This is called when user manually trigger refresh datamap
-   */
-  void incrementalBuild(CarbonTable mainTable, String[] segmentIds, SparkSession sparkSession)
-    throws DataLoadingException;
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
deleted file mode 100644
index 1f075de..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.exceptions.MetadataProcessException;
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
-import org.apache.carbondata.core.datamap.DataMapRegistry;
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.format.TableInfo;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil;
-
-@InterfaceAudience.Internal
-public class IndexDataMapProvider implements DataMapProvider {
-
-  private TableInfo originalTableInfo;
-
-  @Override
-  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
-      SparkSession sparkSession) throws MalformedDataMapCommandException, IOException {
-    DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
-    DataMapStoreManager.getInstance().registerDataMap(
-        mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
-    originalTableInfo = PreAggregateUtil.updateMainTable(mainTable, dataMapSchema, sparkSession);
-  }
-
-  @Override
-  public void initData(CarbonTable mainTable, SparkSession sparkSession) {
-    // Nothing is needed to do by default
-  }
-
-  @Override
-  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
-      SparkSession sparkSession) {
-    PreAggregateUtil.updateSchemaInfo(mainTable, originalTableInfo, sparkSession);
-  }
-
-  @Override
-  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
-      SparkSession sparkSession) {
-    DataMapStoreManager.getInstance().clearDataMap(
-        mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
-  }
-
-  @Override
-  public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
-    // Nothing is needed to do by default
-  }
-
-  @Override
-  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
-      SparkSession sparkSession) {
-    throw new UnsupportedOperationException();
-  }
-
-  private DataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema)
-      throws MalformedDataMapCommandException {
-    DataMapFactory dataMapFactory;
-    try {
-      // try to create DataMapProvider instance by taking providerName as class name
-      Class<? extends DataMapFactory> providerClass =
-          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
-      dataMapFactory = providerClass.newInstance();
-    } catch (ClassNotFoundException e) {
-      // try to create DataMapProvider instance by taking providerName as short name
-      dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getProviderName());
-    } catch (Throwable e) {
-      throw new MetadataProcessException(
-          "failed to create DataMapProvider '" + dataMapSchema.getProviderName() + "'", e);
-    }
-    return dataMapFactory;
-  }
-
-  private DataMapFactory getDataMapFactoryByShortName(String providerName)
-      throws MalformedDataMapCommandException {
-    DataMapFactory dataMapFactory;
-    String className = DataMapRegistry.getDataMapClassName(providerName);
-    if (className != null) {
-      try {
-        Class<? extends DataMapFactory> datamapClass =
-            (Class<? extends DataMapFactory>) Class.forName(providerName);
-        dataMapFactory = datamapClass.newInstance();
-      } catch (ClassNotFoundException ex) {
-        throw new MalformedDataMapCommandException(
-            "DataMap '" + providerName + "' not found", ex);
-      } catch (Throwable ex) {
-        throw new MetadataProcessException(
-            "failed to create DataMap '" + providerName + "'", ex);
-      }
-    } else {
-      throw new MalformedDataMapCommandException(
-          "DataMap '" + providerName + "' not found");
-    }
-    return dataMapFactory;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index c33354e..7151dcd 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.datamap;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.DataMapCatalog;
+import org.apache.carbondata.core.datamap.DataMapProvider;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
@@ -31,10 +33,15 @@ import scala.Some;
 public class PreAggregateDataMapProvider implements DataMapProvider {
   protected PreAggregateTableHelper helper;
   protected CarbonDropTableCommand dropTableCommand;
+  protected SparkSession sparkSession;
+
+  public PreAggregateDataMapProvider(SparkSession sparkSession) {
+    this.sparkSession = sparkSession;
+  }
 
   @Override
-  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
-      SparkSession sparkSession) throws MalformedDataMapCommandException {
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
+      throws MalformedDataMapCommandException {
     validateDmProperty(dataMapSchema);
     helper = new PreAggregateTableHelper(
         mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getProviderName(),
@@ -54,13 +61,12 @@ public class PreAggregateDataMapProvider implements DataMapProvider {
   }
 
   @Override
-  public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+  public void initData(CarbonTable mainTable) {
     // Nothing is needed to do by default
   }
 
   @Override
-  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
-      SparkSession sparkSession) {
+  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) {
     dropTableCommand = new CarbonDropTableCommand(
         true,
         new Some<>(dataMapSchema.getRelationIdentifier().getDatabaseName()),
@@ -70,23 +76,26 @@ public class PreAggregateDataMapProvider implements DataMapProvider {
   }
 
   @Override
-  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
-      SparkSession sparkSession) {
+  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema) {
     if (dropTableCommand != null) {
       dropTableCommand.processData(sparkSession);
     }
   }
 
   @Override
-  public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+  public void rebuild(CarbonTable mainTable) {
     if (helper != null) {
       helper.initData(sparkSession);
     }
   }
 
   @Override
-  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
-      SparkSession sparkSession) {
+  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds) {
     throw new UnsupportedOperationException();
   }
+
+  @Override public DataMapCatalog createDataMapCatalog() {
+    // TODO manage pre-agg also with catalog.
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
index f1575cd..c2acca4 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
@@ -32,9 +32,13 @@ import scala.Tuple2;
 @InterfaceAudience.Internal
 public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider {
 
+  public TimeseriesDataMapProvider(SparkSession sparkSession) {
+    super(sparkSession);
+  }
+
   @Override
-  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
-      SparkSession sparkSession) {
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      String ctasSqlStatement) {
     Map<String, String> dmProperties = dataMapSchema.getProperties();
     String dmProviderName = dataMapSchema.getProviderName();
     TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 8c475d4..d7592f9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
-import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
+import org.apache.carbondata.datamap.DataMapManager
 
 /**
  * Below command class will be used to create datamap on table
@@ -33,7 +34,7 @@ import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
  */
 case class CarbonCreateDataMapCommand(
     dataMapName: String,
-    tableIdentifier: TableIdentifier,
+    tableIdentifier: Option[TableIdentifier],
     dmClassName: String,
     dmProperties: Map[String, String],
     queryString: Option[String],
@@ -48,12 +49,16 @@ case class CarbonCreateDataMapCommand(
     // since streaming segment does not support building index and pre-aggregate yet,
     // so streaming table does not support create datamap
     mainTable =
-      CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
-    if (mainTable.isStreamingTable) {
+      tableIdentifier match {
+        case Some(table) =>
+          CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+        case _ => null
+      }
+    if (mainTable != null && mainTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
 
-    if (mainTable.getDataMapSchema(dataMapName) != null) {
+    if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {
       if (!ifNotExistsSet) {
         throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
       } else {
@@ -64,19 +69,19 @@ case class CarbonCreateDataMapCommand(
     dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
     dataMapSchema.setProperties(new java.util.HashMap[String, String](
       dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
-    dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema)
-    dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, sparkSession)
+    dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema, sparkSession)
+    dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull)
 
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
+    LOGGER.audit(s"DataMap $dataMapName successfully added")
     Seq.empty
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dataMapProvider != null) {
-      dataMapProvider.initData(mainTable, sparkSession)
-      if (mainTable.isAutoRefreshDataMap) {
-        dataMapProvider.rebuild(mainTable, sparkSession)
+      dataMapProvider.initData(mainTable)
+      if (mainTable != null && mainTable.isAutoRefreshDataMap) {
+        dataMapProvider.rebuild(mainTable)
       }
     }
     Seq.empty
@@ -84,7 +89,7 @@ case class CarbonCreateDataMapCommand(
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
     if (dataMapProvider != null) {
-      dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
+      dataMapProvider.freeMeta(mainTable, dataMapSchema)
     }
     Seq.empty
   }