You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/06 08:50:19 UTC

[4/4] carbondata git commit: [CARBONDATA-2415] Support Refresh DataMap command for all Index datamap

[CARBONDATA-2415] Support Refresh DataMap command for all Index datamap

Refactor DataMapWriter interface to accept row instead of column page when adding data. This refactory need to modify a lot of test case files
Add REFRESH DATAMAP support for all index datamap including Lucene and Bloom
Make IndexDataMapRefreshRDD generic for all index datamap
For all index datamap, added a unified DMPROPERTIES called INDEX_COLUMNS to specify the column name list for indexing

This closes #2254


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9db662a2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9db662a2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9db662a2

Branch: refs/heads/master
Commit: 9db662a2d352e379b42495636f27f7c10bf49f6c
Parents: f2fb068
Author: Jacky Li <ja...@qq.com>
Authored: Tue May 1 14:51:08 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sun May 6 14:20:01 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../carbondata/core/datamap/DataMapChooser.java |   8 +-
 .../carbondata/core/datamap/DataMapMeta.java    |  21 +-
 .../core/datamap/DataMapProvider.java           |  53 ++--
 .../core/datamap/DataMapRegistry.java           |  44 ++-
 .../core/datamap/DataMapStoreManager.java       |  27 +-
 .../core/datamap/IndexDataMapProvider.java      | 138 --------
 .../apache/carbondata/core/datamap/Segment.java |   4 +
 .../carbondata/core/datamap/TableDataMap.java   |   4 +-
 .../core/datamap/dev/DataMapFactory.java        |  76 ++++-
 .../core/datamap/dev/DataMapRefresher.java      |  36 +++
 .../core/datamap/dev/DataMapWriter.java         |  54 ++--
 .../cgdatamap/CoarseGrainDataMapFactory.java    |  11 +-
 .../dev/expr/DataMapExprWrapperImpl.java        |   2 +-
 .../dev/fgdatamap/FineGrainBlocklet.java        |   5 +-
 .../dev/fgdatamap/FineGrainDataMapFactory.java  |  12 +-
 .../datamap/status/DataMapStatusManager.java    |   2 +-
 .../carbondata/core/indexstore/Blocklet.java    |  18 +-
 .../core/indexstore/ExtendedBlocklet.java       |   6 +-
 .../blockletindex/BlockletDataMapFactory.java   |  14 +-
 .../schema/datamap/DataMapClassProvider.java    |  17 +-
 .../core/metadata/schema/table/CarbonTable.java |  30 +-
 .../metadata/schema/table/DataMapSchema.java    |  21 ++
 .../scanner/impl/BlockletFilterScanner.java     |   3 +-
 .../core/util/path/CarbonTablePath.java         |  34 +-
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |   8 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  82 ++---
 .../carbondata/datamap/bloom/BloomDMModel.java  |   3 +
 .../datamap/bloom/BloomDataMapRefresher.java    |  89 ++++++
 .../datamap/bloom/BloomDataMapWriter.java       | 199 ++++++------
 .../datamap/examples/MinMaxDataWriter.java      |  85 ++---
 .../examples/MinMaxIndexDataMapFactory.java     |  28 +-
 .../lucene/LuceneCoarseGrainDataMapFactory.java |  13 +-
 .../lucene/LuceneDataMapFactoryBase.java        | 110 +++----
 .../datamap/lucene/LuceneDataMapRefresher.java  | 224 +++++++++++++
 .../datamap/lucene/LuceneDataMapWriter.java     | 204 +++++-------
 .../datamap/lucene/LuceneFineGrainDataMap.java  |   6 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  13 +-
 .../lucene/LuceneIndexRefreshBuilder.java       | 220 -------------
 .../hadoop/api/CarbonInputFormat.java           |   4 +-
 .../cluster/sdv/generated/LuceneTestCase.scala  |  21 +-
 .../lucene/LuceneCoarseGrainDataMapSuite.scala  |   2 +-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  84 ++---
 .../testsuite/datamap/CGDataMapTestCase.scala   | 100 +++---
 .../testsuite/datamap/DataMapWriterSuite.scala  |  78 +++--
 .../testsuite/datamap/FGDataMapTestCase.scala   |  66 ++--
 .../testsuite/datamap/TestDataMapStatus.scala   |  70 ++--
 .../TestInsertAndOtherCommandConcurrent.scala   |  45 +--
 .../carbondata/datamap/DataMapManager.java      |  13 +-
 .../datamap/IndexDataMapProvider.java           | 147 +++++++++
 .../datamap/PreAggregateDataMapProvider.java    |  30 +-
 .../datamap/TimeseriesDataMapProvider.java      |  10 +-
 .../datamap/IndexDataMapRefreshRDD.scala        | 317 +++++++++++++++++++
 .../lucene/LuceneDataMapRefreshRDD.scala        | 299 -----------------
 .../datamap/CarbonCreateDataMapCommand.scala    |  77 +++--
 .../datamap/CarbonDataMapRefreshCommand.scala   |  17 +-
 .../datamap/CarbonDropDataMapCommand.scala      |  14 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   7 +-
 .../bloom/BloomCoarseGrainDataMapSuite.scala    | 122 +++++--
 .../datamap/DataMapWriterException.java         |  24 ++
 .../datamap/DataMapWriterListener.java          |  42 ++-
 .../CarbonRowDataWriterProcessorStepImpl.java   |   9 +-
 .../store/CarbonFactDataHandlerModel.java       |  24 +-
 .../store/writer/AbstractFactDataWriter.java    |   3 +-
 64 files changed, 1967 insertions(+), 1588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 648f08e..f9bf220 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1720,6 +1720,12 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_LUCENE_COMPRESSION_MODE_DEFAULT = "speed";
 
+  /**
+   * DMPROPERTY for Index DataMap, like lucene, bloomfilter DataMap,
+   * to indicate a list of column name to be indexed
+   */
+  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 57069b8..478254d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -279,6 +279,10 @@ public class DataMapChooser {
     return null;
   }
 
+  /**
+   * Return true if the input datamap contains the column that needed in
+   * specified expression
+   */
   private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions,
       Set<ExpressionType> expressionTypes) {
     if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH) &&
@@ -291,8 +295,8 @@ public class DataMapChooser {
     }
     boolean contains = true;
     for (ColumnExpression expression : columnExpressions) {
-      if (!mapMeta.getIndexedColumns().contains(expression.getColumnName()) || !mapMeta
-          .getOptimizedOperation().containsAll(expressionTypes)) {
+      if (!mapMeta.getIndexedColumnNames().contains(expression.getColumnName()) ||
+          !mapMeta.getOptimizedOperation().containsAll(expressionTypes)) {
         contains = false;
         break;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index cf51b11..adf85d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -21,8 +21,12 @@ import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+
 /**
  * Metadata of the datamap, set by DataMap developer
  */
@@ -31,16 +35,17 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 public class DataMapMeta {
   private String dataMapName;
 
-  private List<String> indexedColumns;
+  private List<CarbonColumn> indexedColumns;
 
   private List<ExpressionType> optimizedOperation;
 
-  public DataMapMeta(List<String> indexedColumns, List<ExpressionType> optimizedOperation) {
+  public DataMapMeta(List<CarbonColumn> indexedColumns,
+      List<ExpressionType> optimizedOperation) {
     this.indexedColumns = indexedColumns;
     this.optimizedOperation = optimizedOperation;
   }
 
-  public DataMapMeta(String dataMapName, List<String> indexedColumns,
+  public DataMapMeta(String dataMapName, List<CarbonColumn> indexedColumns,
       List<ExpressionType> optimizedOperation) {
     this(indexedColumns, optimizedOperation);
     this.dataMapName = dataMapName;
@@ -50,10 +55,18 @@ public class DataMapMeta {
     return dataMapName;
   }
 
-  public List<String> getIndexedColumns() {
+  public List<CarbonColumn> getIndexedColumns() {
     return indexedColumns;
   }
 
+  public List<String> getIndexedColumnNames() {
+    return (List<String>) CollectionUtils.collect(indexedColumns, new Transformer() {
+      @Override public Object transform(Object input) {
+        return ((CarbonColumn) input).getColName();
+      }
+    });
+  }
+
   public List<ExpressionType> getOptimizedOperation() {
     return optimizedOperation;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index 61dcfd1..775b912 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
@@ -42,47 +44,61 @@ import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
  *
  * <br>Currently CarbonData supports following provider:
  * <ol>
- *   <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
- *   <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
- *     of the table </li>
- *   <li> class name of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory}
- * implementation: Developer can implement new type of DataMap by extending
- * {@link org.apache.carbondata.core.datamap.dev.DataMapFactory} </li>
+ *   <li> preaggregate: pre-aggregate table of single table </li>
+ *   <li> timeseries: pre-aggregate table based on time dimension of the table </li>
+ *   <li> lucene: index backed by Apache Lucene </li>
+ *   <li> bloomfilter: index backed by Bloom Filter </li>
  * </ol>
  *
  * @since 1.4.0
  */
 @InterfaceAudience.Internal
-public interface DataMapProvider {
+public abstract class DataMapProvider {
+
+  private CarbonTable mainTable;
+  private DataMapSchema dataMapSchema;
+
+  public DataMapProvider(CarbonTable mainTable, DataMapSchema dataMapSchema) {
+    this.mainTable = mainTable;
+    this.dataMapSchema = dataMapSchema;
+  }
+
+  protected final CarbonTable getMainTable() {
+    return mainTable;
+  }
+
+  protected final DataMapSchema getDataMapSchema() {
+    return dataMapSchema;
+  }
 
   /**
    * Initialize a datamap's metadata.
    * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
    * Implementation should initialize metadata for datamap, like creating table
    */
-  void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
-      throws MalformedDataMapCommandException, IOException;
+  public abstract void initMeta(String ctasSqlStatement) throws MalformedDataMapCommandException,
+      IOException;
 
   /**
    * Initialize a datamap's data.
    * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
    * Implementation should initialize data for datamap, like creating data folders
    */
-  void initData(CarbonTable mainTable);
+  public abstract void initData();
 
   /**
-   * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String)}.
+   * Opposite operation of {@link #initMeta(String)}.
    * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
    * Implementation should clean all meta for the datamap
    */
-  void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException;
+  public abstract void cleanMeta() throws IOException;
 
   /**
-   * Opposite operation of {@link #initData(CarbonTable)}.
+   * Opposite operation of {@link #initData()}.
    * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
    * Implementation should clean all data for the datamap
    */
-  void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema);
+  public abstract void cleanData();
 
   /**
    * Rebuild the datamap by loading all existing data from mainTable
@@ -90,19 +106,20 @@ public interface DataMapProvider {
    * 1. after datamap creation and if `autoRefreshDataMap` is set to true
    * 2. user manually trigger refresh datamap command
    */
-  void rebuild(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException;
+  public abstract void rebuild() throws IOException, NoSuchDataMapException;
 
   /**
    * Build the datamap incrementally by loading specified segment data
    * This is called when user manually trigger refresh datamap
    */
-  void incrementalBuild(CarbonTable mainTable, DataMapSchema dataMapSchema, String[] segmentIds)
-    throws IOException;
+  public abstract void incrementalBuild(String[] segmentIds) throws IOException;
 
   /**
    * Provide the datamap catalog instance or null if this datamap not required to rewrite
    * the query.
    */
-  DataMapCatalog createDataMapCatalog();
+  public abstract DataMapCatalog createDataMapCatalog();
+
+  public abstract DataMapFactory getDataMapFactory();
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
index 1b6782a..8c3640a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
@@ -17,12 +17,20 @@
 
 package org.apache.carbondata.core.datamap;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
  * Developer can register a datamap implementation with a short name.
@@ -33,7 +41,7 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
  *  USING 'short-name-of-the-datamap'
  * }
  * otherwise, user should use the class name of the datamap implementation to create the datamap
- * (subclass of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory})
+ * (subclass of {@link DataMapFactory})
  * <p>
  * {@code
  *  CREATE DATAMAP dm ON TABLE table
@@ -45,14 +53,44 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
 public class DataMapRegistry {
   private static Map<String, String> shortNameToClassName = new ConcurrentHashMap<>();
 
-  public static void registerDataMap(String datamapClassName, String shortName) {
+  private static void registerDataMap(String datamapClassName, String shortName) {
     Objects.requireNonNull(datamapClassName);
     Objects.requireNonNull(shortName);
     shortNameToClassName.put(shortName, datamapClassName);
   }
 
-  public static String getDataMapClassName(String shortName) {
+  private static String getDataMapClassName(String shortName) {
     Objects.requireNonNull(shortName);
     return shortNameToClassName.get(shortName);
   }
+
+  public static DataMapFactory<? extends DataMap> getDataMapFactoryByShortName(
+      CarbonTable table, DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
+    String providerName = dataMapSchema.getProviderName();
+    try {
+      registerDataMap(
+          DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(),
+          DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName());
+    } catch (UnsupportedOperationException ex) {
+      throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
+    }
+    DataMapFactory<? extends DataMap> dataMapFactory;
+    String className = getDataMapClassName(providerName.toLowerCase());
+    if (className != null) {
+      try {
+        dataMapFactory = (DataMapFactory<? extends DataMap>)
+            Class.forName(className).getConstructors()[0].newInstance(table, dataMapSchema);
+      } catch (ClassNotFoundException ex) {
+        throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
+      } catch (InvocationTargetException ex) {
+        throw new MalformedDataMapCommandException(ex.getTargetException().getMessage());
+      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException ex) {
+        throw new MetadataProcessException(
+            "failed to create DataMap '" + providerName + "': " + ex.getMessage(), ex);
+      }
+    } else {
+      throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found");
+    }
+    return dataMapFactory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a871d57..29a1106 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -87,7 +87,7 @@ public final class DataMapStoreManager {
     List<TableDataMap> tableIndices = getAllVisibleDataMap(carbonTable);
     if (tableIndices != null) {
       for (TableDataMap dataMap : tableIndices) {
-        if (mapType == dataMap.getDataMapFactory().getDataMapType()) {
+        if (mapType == dataMap.getDataMapFactory().getDataMapLevel()) {
           dataMaps.add(dataMap);
         }
       }
@@ -262,22 +262,19 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * Return a new datamap instance for the given
-   * @param dataMapSchema
-   * @return
-   * @throws MalformedDataMapCommandException
+   * Return a new datamap instance and registered in the store manager.
+   * The datamap is created using datamap name, datamap factory class and table identifier.
    */
-  public DataMapFactory getDataMapFactoryClass(DataMapSchema dataMapSchema)
+  public DataMapFactory getDataMapFactoryClass(CarbonTable table, DataMapSchema dataMapSchema)
       throws MalformedDataMapCommandException {
-    DataMapFactory dataMapFactory;
     try {
       // try to create datamap by reflection to test whether it is a valid DataMapFactory class
-      Class<? extends DataMapFactory> factoryClass =
-          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
-      return factoryClass.newInstance();
+      return (DataMapFactory)
+          Class.forName(dataMapSchema.getProviderName()).getConstructors()[0]
+              .newInstance(table, dataMapSchema);
     } catch (ClassNotFoundException e) {
       // try to create DataMapClassProvider instance by taking providerName as short name
-      return IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName());
+      return DataMapRegistry.getDataMapFactoryByShortName(table, dataMapSchema);
     } catch (Throwable e) {
       throw new MetadataProcessException(
           "failed to get DataMap factory for'" + dataMapSchema.getProviderName() + "'", e);
@@ -290,14 +287,13 @@ public final class DataMapStoreManager {
    */
   // TODO: make it private
   public TableDataMap createAndRegisterDataMap(CarbonTable table,
-      DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
-    DataMapFactory dataMapFactory  = getDataMapFactoryClass(dataMapSchema);
+      DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
+    DataMapFactory dataMapFactory  = getDataMapFactoryClass(table, dataMapSchema);
     return registerDataMap(table, dataMapSchema, dataMapFactory);
   }
 
   public TableDataMap registerDataMap(CarbonTable table,
-      DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory)
-      throws IOException, MalformedDataMapCommandException {
+      DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory) {
     String tableUniqueName = table.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(table);
@@ -306,7 +302,6 @@ public final class DataMapStoreManager {
       tableIndices = new ArrayList<>();
     }
 
-    dataMapFactory.init(table, dataMapSchema);
     BlockletDetailsFetcher blockletDetailsFetcher;
     SegmentPropertiesFetcher segmentPropertiesFetcher = null;
     if (dataMapFactory instanceof BlockletDetailsFetcher) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
deleted file mode 100644
index a22bd0b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datamap;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.exceptions.MetadataProcessException;
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
-
-@InterfaceAudience.Internal
-public class IndexDataMapProvider implements DataMapProvider {
-
-  public IndexDataMapProvider() {
-  }
-
-  @Override
-  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
-      throws MalformedDataMapCommandException, IOException {
-    if (mainTable == null) {
-      throw new MalformedDataMapCommandException(
-          "Parent table is required to create index datamap");
-    }
-    ArrayList<RelationIdentifier> relationIdentifiers = new ArrayList<>();
-    RelationIdentifier relationIdentifier =
-        new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(),
-            mainTable.getTableInfo().getFactTable().getTableId());
-    relationIdentifiers.add(relationIdentifier);
-    dataMapSchema.setRelationIdentifier(relationIdentifier);
-    dataMapSchema.setParentTables(relationIdentifiers);
-    DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
-    DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory);
-    DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema);
-  }
-
-  @Override
-  public void initData(CarbonTable mainTable) {
-    // Nothing is needed to do by default
-  }
-
-  @Override
-  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException {
-    if (mainTable == null) {
-      throw new UnsupportedOperationException("Table need to be specified in index datamaps");
-    }
-    DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName());
-  }
-
-  @Override
-  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema) {
-    if (mainTable == null) {
-      throw new UnsupportedOperationException("Table need to be specified in index datamaps");
-    }
-    DataMapStoreManager.getInstance().clearDataMap(
-        mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
-  }
-
-  @Override
-  public void rebuild(CarbonTable mainTable, DataMapSchema dataMapSchema) {
-    // Nothing is needed to do by default
-  }
-
-  @Override public void incrementalBuild(CarbonTable mainTable, DataMapSchema dataMapSchema,
-      String[] segmentIds) {
-    throw new UnsupportedOperationException();
-  }
-
-  private DataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema)
-      throws MalformedDataMapCommandException {
-    DataMapFactory dataMapFactory;
-    try {
-      // try to create DataMapClassProvider instance by taking providerName as class name
-      Class<? extends DataMapFactory> providerClass =
-          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
-      dataMapFactory = providerClass.newInstance();
-    } catch (ClassNotFoundException e) {
-      // try to create DataMapClassProvider instance by taking providerName as short name
-      dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getProviderName());
-    } catch (Throwable e) {
-      throw new MetadataProcessException(
-          "failed to create DataMapClassProvider '" + dataMapSchema.getProviderName() + "'", e);
-    }
-    return dataMapFactory;
-  }
-
-  public static DataMapFactory getDataMapFactoryByShortName(String providerName)
-      throws MalformedDataMapCommandException {
-    try {
-      DataMapRegistry.registerDataMap(
-          DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(),
-          DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName());
-    } catch (UnsupportedOperationException ex) {
-      throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
-    }
-    DataMapFactory dataMapFactory;
-    String className = DataMapRegistry.getDataMapClassName(providerName.toLowerCase());
-    if (className != null) {
-      try {
-        Class<? extends DataMapFactory> datamapClass =
-            (Class<? extends DataMapFactory>) Class.forName(className);
-        dataMapFactory = datamapClass.newInstance();
-      } catch (ClassNotFoundException ex) {
-        throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
-      } catch (Throwable ex) {
-        throw new MetadataProcessException("failed to create DataMap '" + providerName + "'", ex);
-      }
-    } else {
-      throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found");
-    }
-    return dataMapFactory;
-  }
-
-  @Override public DataMapCatalog createDataMapCatalog() {
-    // TODO create abstract class and move the default implementation there.
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 9179bbc..476f9da 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -58,6 +58,10 @@ public class Segment implements Serializable {
    */
   private LoadMetadataDetails loadMetadataDetails;
 
+  public Segment(String segmentNo) {
+    this.segmentNo = segmentNo;
+  }
+
   /**
    * ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
    * a NullPointerException. In case getCommittedIndexFile is need to be accessed then

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 2dc6317..314b515 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -155,13 +155,13 @@ public final class TableDataMap extends OperationEventListener {
     String writePath =
         identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema
             .getDataMapName();
-    if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
+    if (dataMapFactory.getDataMapLevel() == DataMapLevel.FG) {
       FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
     }
     for (Blocklet blocklet : blocklets) {
       ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
           .getExtendedBlocklet(blocklet, distributable.getSegment());
-      if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
+      if (dataMapFactory.getDataMapLevel() == DataMapLevel.FG) {
         String blockletwritePath =
             writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
         detailedBlocklet.setDataMapWriterPath(blockletwritePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 20bdfb7..ae34be7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -17,7 +17,9 @@
 package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
@@ -27,74 +29,114 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.events.Event;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.INDEX_COLUMNS;
 
 /**
- * Interface for datamap factory, it is responsible for creating the datamap.
+ * Interface for datamap of index type, it is responsible for creating the datamap.
  */
-public interface DataMapFactory<T extends DataMap> {
+public abstract class DataMapFactory<T extends DataMap> {
+
+  private CarbonTable carbonTable;
+  private DataMapSchema dataMapSchema;
+
+  public DataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    this.carbonTable = carbonTable;
+    this.dataMapSchema = dataMapSchema;
+  }
+
+  public CarbonTable getCarbonTable() {
+    return carbonTable;
+  }
+
+  public DataMapSchema getDataMapSchema() {
+    return dataMapSchema;
+  }
 
   /**
-   * Initialization of Datamap factory with the carbonTable and datamap name
+   * Create a new write for this datamap, to write new data into the specified segment and shard
    */
-  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
-      throws IOException, MalformedDataMapCommandException;
+  public abstract DataMapWriter createWriter(Segment segment, String shardName)
+      throws IOException;
 
   /**
-   * Return a new write for this datamap
+   * Create a new Refresher for this datamap, to rebuild the specified
+   * segment and shard data in the main table.
    */
-  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
+  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
+      throws IOException;
 
   /**
    * Get the datamap for segmentid
    */
-  List<T> getDataMaps(Segment segment) throws IOException;
+  public abstract List<T> getDataMaps(Segment segment) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<T> getDataMaps(DataMapDistributable distributable)
+  public abstract List<T> getDataMaps(DataMapDistributable distributable)
       throws IOException;
 
   /**
    * Get all distributable objects of a segmentid
    * @return
    */
-  List<DataMapDistributable> toDistributable(Segment segment);
+  public abstract List<DataMapDistributable> toDistributable(Segment segment);
 
   /**
    *
    * @param event
    */
-  void fireEvent(Event event);
+  public abstract void fireEvent(Event event);
 
   /**
    * Clears datamap of the segment
    */
-  void clear(Segment segment);
+  public abstract void clear(Segment segment);
 
   /**
    * Clear all datamaps from memory
    */
-  void clear();
+  public abstract void clear();
 
   /**
    * Return metadata of this datamap
    */
-  DataMapMeta getMeta();
+  public abstract DataMapMeta getMeta();
 
   /**
    *  Type of datamap whether it is FG or CG
    */
-  DataMapLevel getDataMapType();
+  public abstract DataMapLevel getDataMapLevel();
 
   /**
    * delete datamap data if any
    */
-  void deleteDatamapData();
+  public abstract void deleteDatamapData();
 
   /**
    * This function should return true is the input operation enum will make the datamap become stale
    */
-  boolean willBecomeStale(TableOperation operation);
+  public abstract boolean willBecomeStale(TableOperation operation);
+
+  /**
+   * Validate INDEX_COLUMNS property and return a array containing index column name
+   * Following will be validated
+   * 1. require INDEX_COLUMNS property
+   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
+   * 3. INDEX_COLUMNS can't contains duplicate same columns
+   * 4. INDEX_COLUMNS should be exists in table columns
+   */
+  public void validate() throws MalformedDataMapCommandException {
+    List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema);
+    Set<String> unique = new HashSet<>();
+    for (CarbonColumn indexColumn : indexColumns) {
+      unique.add(indexColumn.getColName());
+    }
+    if (unique.size() != indexColumns.size()) {
+      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
new file mode 100644
index 0000000..770ceca
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Interface to rebuild the datamap for main table with existing data
+ */
+@InterfaceAudience.Developer("DataMap")
+public interface DataMapRefresher {
+  void initialize() throws IOException;
+
+  void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException;
+
+  void finish() throws IOException;
+
+  void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 1933f70..03a369a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -16,14 +16,16 @@
  */
 package org.apache.carbondata.core.datamap.dev;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -36,17 +38,25 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 @InterfaceStability.Evolving
 public abstract class DataMapWriter {
 
-  protected AbsoluteTableIdentifier identifier;
+  protected String tablePath;
 
   protected String segmentId;
 
-  protected String writeDirectoryPath;
+  protected String dataMapPath;
 
-  public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
-      String writeDirectoryPath) {
-    this.identifier = identifier;
+  private List<CarbonColumn> indexColumns;
+
+  public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
+      Segment segment, String shardName) {
+    this.tablePath = tablePath;
     this.segmentId = segment.getSegmentNo();
-    this.writeDirectoryPath = writeDirectoryPath;
+    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
+        tablePath, segmentId, dataMapName, shardName);
+    this.indexColumns = indexColumns;
+  }
+
+  protected final List<CarbonColumn> getIndexColumns() {
+    return indexColumns;
   }
 
   /**
@@ -54,7 +64,7 @@ public abstract class DataMapWriter {
    *
    * @param blockId file name of the carbondata file
    */
-  public abstract void onBlockStart(String blockId, String indexShardName) throws IOException;
+  public abstract void onBlockStart(String blockId) throws IOException;
 
   /**
    * End of block notification
@@ -66,22 +76,22 @@ public abstract class DataMapWriter {
    *
    * @param blockletId sequence number of blocklet in the block
    */
-  public abstract void onBlockletStart(int blockletId);
+  public abstract void onBlockletStart(int blockletId) throws IOException;
 
   /**
    * End of blocklet notification
    *
    * @param blockletId sequence number of blocklet in the block
    */
-  public abstract void onBlockletEnd(int blockletId);
+  public abstract void onBlockletEnd(int blockletId) throws IOException;
 
   /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * Add columnar page data to the datamap, order of field is same as `indexColumns` in
    * DataMapMeta returned in DataMapFactory.
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
+   * Implementation should copy the content of it as needed, because its memory
+   * may be freed after this method returns, in case of unsafe memory
    */
-  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
+  public abstract void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages)
       throws IOException;
 
   /**
@@ -97,15 +107,15 @@ public abstract class DataMapWriter {
    * @throws IOException if IO fails
    */
   protected void commitFile(String dataMapFile) throws IOException {
-    if (!dataMapFile.startsWith(writeDirectoryPath)) {
+    if (!dataMapFile.startsWith(dataMapPath)) {
       throw new UnsupportedOperationException(
           "Datamap file " + dataMapFile + " is not written in provided directory path "
-              + writeDirectoryPath);
+              + dataMapPath);
     }
     String dataMapFileName =
-        dataMapFile.substring(writeDirectoryPath.length(), dataMapFile.length());
+        dataMapFile.substring(dataMapPath.length(), dataMapFile.length());
     String carbonFilePath = dataMapFileName.substring(0, dataMapFileName.lastIndexOf("/"));
-    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
     if (carbonFilePath.length() > 0) {
       carbonFilePath = segmentPath + carbonFilePath;
       FileFactory.mkdirs(carbonFilePath, FileFactory.getFileType(carbonFilePath));
@@ -115,4 +125,12 @@ public abstract class DataMapWriter {
     CarbonUtil.copyCarbonDataFileToCarbonStorePath(dataMapFile, carbonFilePath, 0);
   }
 
+
+  /**
+   * Return store path for datamap
+   */
+  public static String getDefaultDataMapPath(
+      String tablePath, String segmentId, String dataMapName) {
+    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
index f9fdafb..3de923f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
@@ -21,6 +21,8 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
  *  Factory for {@link CoarseGrainDataMap}
@@ -31,9 +33,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
  */
 @InterfaceAudience.Developer("DataMap")
 @InterfaceStability.Evolving
-public abstract class CoarseGrainDataMapFactory implements DataMapFactory<CoarseGrainDataMap> {
+public abstract class CoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDataMap> {
 
-  @Override public DataMapLevel getDataMapType() {
+  public CoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    super(carbonTable, dataMapSchema);
+  }
+
+  @Override
+  public DataMapLevel getDataMapLevel() {
     return DataMapLevel.CG;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index eff6b4d..0a3896c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -85,7 +85,7 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
   }
 
   @Override public DataMapLevel getDataMapLevel() {
-    return dataMap.getDataMapFactory().getDataMapType();
+    return dataMap.getDataMapFactory().getDataMapLevel();
   }
 
   @Override public DataMapSchema getDataMapSchema() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
index 9c78cc8..9d8f594 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
@@ -41,8 +41,9 @@ public class FineGrainBlocklet extends Blocklet implements Serializable {
 
   private List<Page> pages;
 
-  public FineGrainBlocklet(String taskName, String blockletId, List<Page> pages) {
-    super(taskName, blockletId);
+  /** For FG, pass the shardName instead of file name */
+  public FineGrainBlocklet(String shardName, String blockletId, List<Page> pages) {
+    super(shardName, blockletId);
     this.pages = pages;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
index 0c9aaed..5e4ecb4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
@@ -20,6 +20,8 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
  *  Factory for {@link FineGrainDataMap}
@@ -35,10 +37,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
  */
 @InterfaceAudience.Developer("DataMap")
 @InterfaceStability.Evolving
-public abstract class FineGrainDataMapFactory
-    implements DataMapFactory<FineGrainDataMap> {
+public abstract class FineGrainDataMapFactory extends DataMapFactory<FineGrainDataMap> {
 
-  @Override public DataMapLevel getDataMapType() {
+  public FineGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    super(carbonTable, dataMapSchema);
+  }
+
+  @Override
+  public DataMapLevel getDataMapLevel() {
     return DataMapLevel.FG;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index 31ab4e4..dcad80b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -51,7 +51,7 @@ public class DataMapStatusManager {
     return storageProvider.getDataMapStatusDetails();
   }
 
-  public static void disableDataMap(String dataMapName) throws Exception {
+  public static void disableDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
     DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     if (dataMapSchema != null) {
       List<DataMapSchema> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 9b40be4..777a980 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -29,13 +29,13 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
 public class Blocklet implements Writable,Serializable {
 
   /** file path of this blocklet */
-  private String taskName;
+  private String filePath;
 
   /** id to identify the blocklet inside the block (it is a sequential number) */
   private String blockletId;
 
-  public Blocklet(String taskName, String blockletId) {
-    this.taskName = taskName;
+  public Blocklet(String filePath, String blockletId) {
+    this.filePath = filePath;
     this.blockletId = blockletId;
   }
 
@@ -47,17 +47,17 @@ public class Blocklet implements Writable,Serializable {
     return blockletId;
   }
 
-  public String getTaskName() {
-    return taskName;
+  public String getFilePath() {
+    return filePath;
   }
 
   @Override public void write(DataOutput out) throws IOException {
-    out.writeUTF(taskName);
+    out.writeUTF(filePath);
     out.writeUTF(blockletId);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
-    taskName = in.readUTF();
+    filePath = in.readUTF();
     blockletId = in.readUTF();
   }
 
@@ -67,7 +67,7 @@ public class Blocklet implements Writable,Serializable {
 
     Blocklet blocklet = (Blocklet) o;
 
-    if (taskName != null ? !taskName.equals(blocklet.taskName) : blocklet.taskName != null) {
+    if (filePath != null ? !filePath.equals(blocklet.filePath) : blocklet.filePath != null) {
       return false;
     }
     return blockletId != null ?
@@ -76,7 +76,7 @@ public class Blocklet implements Writable,Serializable {
   }
 
   @Override public int hashCode() {
-    int result = taskName != null ? taskName.hashCode() : 0;
+    int result = filePath != null ? filePath.hashCode() : 0;
     result = 31 * result + (blockletId != null ? blockletId.hashCode() : 0);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index ea2752c..077b942 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -33,8 +33,8 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String dataMapUniqueId;
 
-  public ExtendedBlocklet(String path, String blockletId) {
-    super(path, blockletId);
+  public ExtendedBlocklet(String filePath, String blockletId) {
+    super(filePath, blockletId);
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -66,7 +66,7 @@ public class ExtendedBlocklet extends Blocklet {
   }
 
   public String getPath() {
-    return getTaskName();
+    return getFilePath();
   }
 
   public String getDataMapWriterPath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 0d7539c..c0bc2a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -72,15 +73,20 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
 
-  @Override
-  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+  public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    super(carbonTable, dataMapSchema);
     this.identifier = carbonTable.getAbsoluteTableIdentifier();
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
   }
 
   @Override
-  public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+  public DataMapWriter createWriter(Segment segment, String shardName) {
+    throw new UnsupportedOperationException("not implemented");
+  }
+
+  @Override
+  public DataMapRefresher createRefresher(Segment segment, String shardName) {
     throw new UnsupportedOperationException("not implemented");
   }
 
@@ -147,7 +153,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getIndexFileName().startsWith(blocklet.getTaskName())) {
+      if (identifier.getIndexFileName().startsWith(blocklet.getFilePath())) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
index 4ab400d..d5a99e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
@@ -29,7 +29,8 @@ package org.apache.carbondata.core.metadata.schema.datamap;
 public enum DataMapClassProvider {
   PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
   TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"),
-  LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene");
+  LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene"),
+  BLOOMFILTER("org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapFactory", "bloomfilter");
 
   /**
    * Fully qualified class name of datamap
@@ -60,18 +61,6 @@ public enum DataMapClassProvider {
         dataMapClass.equalsIgnoreCase(shortName)));
   }
 
-  public static DataMapClassProvider getDataMapProvider(String dataMapClass) {
-    if (TIMESERIES.isEqual(dataMapClass)) {
-      return TIMESERIES;
-    } else if (PREAGGREGATE.isEqual(dataMapClass)) {
-      return PREAGGREGATE;
-    } else if (LUCENE.isEqual(dataMapClass)) {
-      return LUCENE;
-    } else {
-      throw new UnsupportedOperationException("Unknown datamap provider/class " + dataMapClass);
-    }
-  }
-
   public static DataMapClassProvider getDataMapProviderOnName(String dataMapShortname) {
     if (TIMESERIES.isEqual(dataMapShortname)) {
       return TIMESERIES;
@@ -79,6 +68,8 @@ public enum DataMapClassProvider {
       return PREAGGREGATE;
     } else if (LUCENE.isEqual(dataMapShortname)) {
       return LUCENE;
+    } else if (BLOOMFILTER.isEqual(dataMapShortname)) {
+      return BLOOMFILTER;
     } else {
       throw new UnsupportedOperationException("Unknown datamap provider" + dataMapShortname);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 1875237..4178d8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -980,8 +981,11 @@ public class CarbonTable implements Serializable {
       if (!datamaps.isEmpty()) {
         for (TableDataMap dataMap : datamaps) {
           DataMapFactory factoryClass =
-              DataMapStoreManager.getInstance().getDataMapFactoryClass(dataMap.getDataMapSchema());
-          return !factoryClass.willBecomeStale(operation);
+              DataMapStoreManager.getInstance().getDataMapFactoryClass(
+                  carbonTable, dataMap.getDataMapSchema());
+          if (factoryClass.willBecomeStale(operation)) {
+            return false;
+          }
         }
       }
     } catch (Exception e) {
@@ -993,4 +997,26 @@ public class CarbonTable implements Serializable {
     return true;
   }
 
+  /**
+   * Get all index columns specified by dataMapSchema
+   */
+  public List<CarbonColumn> getIndexedColumns(DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    String[] columns = dataMapSchema.getIndexColumns();
+    List<CarbonColumn> indexColumn = new ArrayList<>(columns.length);
+    for (String column : columns) {
+      CarbonColumn carbonColumn = getColumnByName(getTableName(), column.trim().toLowerCase());
+      if (carbonColumn == null) {
+        throw new MalformedDataMapCommandException(String.format(
+            "column '%s' does not exist in table. Please check create DataMap statement.",
+            column));
+      }
+      if (carbonColumn.getColName().isEmpty()) {
+        throw new MalformedDataMapCommandException(
+            CarbonCommonConstants.INDEX_COLUMNS + " contains invalid column name");
+      }
+      indexColumn.add(carbonColumn);
+    }
+    return indexColumn;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 235c312..b22a3d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -26,9 +26,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.INDEX_COLUMNS;
 
 import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * It is the new schama of datamap and it has less fields compare to {{@link DataMapSchema}}
@@ -201,6 +204,24 @@ public class DataMapSchema implements Serializable, Writable {
     }
   }
 
+  /**
+   * Return the list of column name
+   */
+  public String[] getIndexColumns()
+      throws MalformedDataMapCommandException {
+    String columns = getProperties().get(INDEX_COLUMNS);
+    if (columns == null) {
+      columns = getProperties().get(INDEX_COLUMNS.toLowerCase());
+    }
+    if (columns == null) {
+      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " DMPROPERTY is required");
+    } else if (StringUtils.isBlank(columns)) {
+      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " DMPROPERTY is blank");
+    } else {
+      return columns.split(",", -1);
+    }
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 37df0e5..57d8177 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -164,7 +164,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
         totalBlockletStatistic.getCount() + 1);
     // set the indexed data if it has any during fgdatamap pruning.
-    rawBlockletColumnChunks.setBitSetGroup(rawBlockletColumnChunks.getDataBlock().getIndexedData());
+    BitSetGroup fgBitSetGroup = rawBlockletColumnChunks.getDataBlock().getIndexedData();
+    rawBlockletColumnChunks.setBitSetGroup(fgBitSetGroup);
     // apply filter on actual data, for each page
     BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks,
         useBitSetPipeLine);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 0538e7f..b9f4838 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -330,6 +330,11 @@ public class CarbonTablePath {
         + bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
+  public static String getShardName(Long taskNo, int bucketNumber, int batchNo,
+      String factUpdateTimeStamp) {
+    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdateTimeStamp;
+  }
+
   /**
    * Below method will be used to get the carbon index filename
    *
@@ -394,6 +399,24 @@ public class CarbonTablePath {
   }
 
   /**
+   * Return store path for datamap based on the taskNo,if three tasks get launched during loading,
+   * then three folders will be created based on the shard name and lucene index file will be
+   * written into those folders
+   *
+   * @return store path based on index shard name
+   */
+  public static String getDataMapStorePathOnShardName(String tablePath, String segmentId,
+      String dataMapName, String shardName) {
+    return new StringBuilder()
+        .append(getSegmentPath(tablePath, segmentId))
+        .append(File.separator)
+        .append(dataMapName)
+        .append(File.separator)
+        .append(shardName)
+        .toString();
+  }
+
+  /**
    * To manage data file name and composition
    */
   public static class DataFileUtil {
@@ -487,6 +510,13 @@ public class CarbonTablePath {
     }
 
     /**
+     * Return task id in the carbon data file name
+     */
+    public static long getTaskId(String carbonDataFileName) {
+      return Long.parseLong(getTaskNo(carbonDataFileName).split(BATCH_PREFIX)[0]);
+    }
+
+    /**
      * Return the taskId part from taskNo(include taskId + batchNo)
      */
     public static long getTaskIdFromTaskNo(String taskNo) {
@@ -643,7 +673,7 @@ public class CarbonTablePath {
   }
 
   public static String getCarbonIndexFileName(String actualBlockName) {
-    return getUniqueTaskName(actualBlockName) + INDEX_FILE_EXT;
+    return getShardName(actualBlockName) + INDEX_FILE_EXT;
   }
 
   /**
@@ -651,7 +681,7 @@ public class CarbonTablePath {
    * @param actualBlockName
    * @return
    */
-  public static String getUniqueTaskName(String actualBlockName) {
+  public static String getShardName(String actualBlockName) {
     return DataFileUtil.getTaskNo(actualBlockName) + "-" + DataFileUtil.getBucketNo(actualBlockName)
         + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 81cccf2..725d5cd 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.fs.PathFilter;
 public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
-  private String[] indexFilePath;
   private Set<String> indexedColumn;
   private List<BloomDMModel> bloomIndexList;
   private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
@@ -88,18 +87,17 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
         return path.getName().endsWith(BLOOM_INDEX_SUFFIX);
       }
     });
-    indexFilePath = new String[indexFileStatus.length];
     indexedColumn = new HashSet<String>();
     bloomIndexList = new ArrayList<BloomDMModel>();
     indexCol2BloomDMList = ArrayListMultimap.create();
     for (int i = 0; i < indexFileStatus.length; i++) {
-      indexFilePath[i] = indexFileStatus[i].getPath().toString();
       String indexfilename = indexFileStatus[i].getPath().getName();
       String indexCol =
           indexfilename.substring(0, indexfilename.length() - BLOOM_INDEX_SUFFIX.length());
       indexedColumn.add(indexCol);
-      bloomIndexList.addAll(readBloomIndex(indexFilePath[i]));
-      indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i]));
+      List<BloomDMModel> models = readBloomIndex(indexFileStatus[i].getPath().toString());
+      bloomIndexList.addAll(models);
+      indexCol2BloomDMList.put(indexCol, models);
     }
     LOGGER.info("find bloom index datamap for column: "
         + StringUtils.join(indexedColumn, ", "));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index b76390f..2d43c40 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.datamap.bloom;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -34,6 +32,7 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -50,15 +49,14 @@ import org.apache.carbondata.events.Event;
 
 import org.apache.commons.lang3.StringUtils;
 
+/**
+ * This class is for Bloom Filter for blocklet level
+ */
 @InterfaceAudience.Internal
-public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrainDataMap> {
+public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDataMap> {
   private static final LogService LOGGER = LogServiceFactory.getLogService(
       BloomCoarseGrainDataMapFactory.class.getName());
   /**
-   * property for indexed column
-   */
-  private static final String BLOOM_COLUMNS = "bloom_columns";
-  /**
    * property for size of bloom filter
    */
   private static final String BLOOM_SIZE = "bloom_size";
@@ -67,21 +65,19 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
    * and all the indexed value is distinct.
    */
   private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20;
-  private CarbonTable carbonTable;
   private DataMapMeta dataMapMeta;
   private String dataMapName;
   private int bloomFilterSize;
 
-  @Override
-  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
-      throws IOException, MalformedDataMapCommandException {
+  public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    super(carbonTable, dataMapSchema);
     Objects.requireNonNull(carbonTable);
     Objects.requireNonNull(dataMapSchema);
 
-    this.carbonTable = carbonTable;
     this.dataMapName = dataMapSchema.getDataMapName();
 
-    List<String> indexedColumns = validateAndGetIndexedColumns(dataMapSchema, carbonTable);
+    List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
     this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
     List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
     // todo: support more optimize operations
@@ -92,40 +88,6 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
   }
 
   /**
-   * validate Lucene DataMap BLOOM_COLUMNS
-   * 1. require BLOOM_COLUMNS property
-   * 2. BLOOM_COLUMNS can't contains illegal argument(empty, blank)
-   * 3. BLOOM_COLUMNS can't contains duplicate same columns
-   * 4. BLOOM_COLUMNS should be exists in table columns
-   */
-  private List<String> validateAndGetIndexedColumns(DataMapSchema dmSchema,
-      CarbonTable carbonTable) throws MalformedDataMapCommandException {
-    String bloomColumnsStr = dmSchema.getProperties().get(BLOOM_COLUMNS);
-    if (StringUtils.isBlank(bloomColumnsStr)) {
-      throw new MalformedDataMapCommandException(
-          String.format("Bloom coarse datamap require proper %s property", BLOOM_COLUMNS));
-    }
-    String[] bloomColumns = StringUtils.split(bloomColumnsStr, ",", -1);
-    List<String> bloomColumnList = new ArrayList<String>(bloomColumns.length);
-    Set<String> bloomColumnSet = new HashSet<String>(bloomColumns.length);
-    for (String bloomCol : bloomColumns) {
-      CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(),
-          bloomCol.trim().toLowerCase());
-      if (null == column) {
-        throw new MalformedDataMapCommandException(
-            String.format("%s: %s does not exist in table. Please check create datamap statement",
-                BLOOM_COLUMNS, bloomCol));
-      }
-      if (!bloomColumnSet.add(column.getColName())) {
-        throw new MalformedDataMapCommandException(String.format("%s has duplicate column: %s",
-            BLOOM_COLUMNS, bloomCol));
-      }
-      bloomColumnList.add(column.getColName());
-    }
-    return bloomColumnList;
-  }
-
-  /**
    * validate Lucene DataMap BLOOM_SIZE
    * 1. BLOOM_SIZE property is optional, 32000 * 20 will be the default size.
    * 2. BLOOM_SIZE should be an integer that greater than 0
@@ -157,21 +119,26 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
   }
 
   @Override
-  public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+  public DataMapWriter createWriter(Segment segment, String shardName) throws IOException {
     LOGGER.info(
         String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
-            this.dataMapName, this.carbonTable.getTableName() , writeDirectoryPath));
-    return new BloomDataMapWriter(this.carbonTable.getAbsoluteTableIdentifier(),
-        this.dataMapMeta, this.bloomFilterSize, segment, writeDirectoryPath);
+            this.dataMapName, getCarbonTable().getTableName() , shardName));
+    return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName,
+        this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize);
+  }
+
+  @Override
+  public DataMapRefresher createRefresher(Segment segment, String shardName) throws IOException {
+    return new BloomDataMapRefresher(getCarbonTable().getTablePath(), this.dataMapName,
+        this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize);
   }
 
   @Override
   public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
     try {
-      String dataMapStorePath = BloomDataMapWriter.genDataMapStorePath(
-          CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segment.getSegmentNo()),
-          dataMapName);
+      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
+          getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
       CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
       for (CarbonFile carbonFile : carbonFiles) {
         BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
@@ -212,13 +179,14 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
 
   @Override
   public void deleteDatamapData() {
-    SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+    SegmentStatusManager ssm =
+        new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
     try {
       List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
       for (Segment segment : validSegments) {
         String segmentId = segment.getSegmentNo();
         String datamapPath = CarbonTablePath.getSegmentPath(
-            carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId)
+            getCarbonTable().getAbsoluteTableIdentifier().getTablePath(), segmentId)
             + File.separator + dataMapName;
         if (FileFactory.isFileExist(datamapPath)) {
           CarbonFile file = FileFactory.getCarbonFile(datamapPath,
@@ -241,7 +209,7 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
   }
 
   @Override
-  public DataMapLevel getDataMapType() {
+  public DataMapLevel getDataMapLevel() {
     return DataMapLevel.CG;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
index 6351199..3cf2f3b 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
@@ -22,6 +22,9 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 
 import com.google.common.hash.BloomFilter;
 
+/**
+ * This class holds a bloom filter for one blocklet
+ */
 @InterfaceAudience.Internal
 public class BloomDMModel implements Serializable {
   private static final long serialVersionUID = 7281578747306832771L;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
new file mode 100644
index 0000000..cb86c39
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapRefresher extends BloomDataMapWriter implements DataMapRefresher {
+
+  BloomDataMapRefresher(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
+      Segment segment, String shardName, int bloomFilterSize) throws IOException {
+    super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize);
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    super.resetBloomFilters();
+  }
+
+  @Override
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values) {
+    if (currentBlockletId != blockletId) {
+      // new blocklet started, flush bloom filter to datamap fileh
+      super.writeBloomDataMapFile();
+      currentBlockletId = blockletId;
+    }
+    // for each indexed column, add the data to bloom filter
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int i = 0; i < indexColumns.size(); i++) {
+      Object data = values[i];
+      DataType dataType = indexColumns.get(i).getDataType();
+      byte[] indexValue;
+      if (DataTypes.STRING == dataType) {
+        indexValue = getStringData(data);
+      } else if (DataTypes.BYTE_ARRAY == dataType) {
+        byte[] originValue = (byte[]) data;
+        // String and byte array is LV encoded, L is short type
+        indexValue = new byte[originValue.length - 2];
+        System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
+      } else {
+        indexValue = CarbonUtil.getValueAsBytes(dataType, data);
+      }
+      indexBloomFilters.get(i).put(indexValue);
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+    super.finish();
+  }
+
+  @Override
+  public void close() throws IOException {
+    releaseResouce();
+  }
+
+  @Override
+  protected byte[] getStringData(Object data) {
+    return ((String) data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+  }
+}