You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/05/09 11:01:17 UTC

[2/2] carbondata git commit: [CARBONDATA-2416] Support DEFERRED REBUILD when creating DataMap

[CARBONDATA-2416] Support DEFERRED REBUILD when creating DataMap

1. REFRESH DATAMAP is changed to REBUILD DATAMAP command
2. When creating datamap, user can choose to load the datamap immediately or later by manually trigger REBUILD DATAMAP command

This closes #2255


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/747be9b1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/747be9b1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/747be9b1

Branch: refs/heads/master
Commit: 747be9b111ab0a12e7550124a9facccacf8ad861
Parents: fb12897
Author: Jacky Li <ja...@qq.com>
Authored: Tue May 1 17:17:33 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed May 9 18:59:03 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapChooser.java |  75 +++--
 .../core/datamap/DataMapProvider.java           |   3 +-
 .../core/datamap/DataMapStoreManager.java       |  19 +-
 .../core/datamap/dev/DataMapBuilder.java        |  38 +++
 .../core/datamap/dev/DataMapFactory.java        |   5 +-
 .../core/datamap/dev/DataMapRefresher.java      |  36 ---
 .../datamap/status/DataMapStatusDetail.java     |   4 +
 .../datamap/status/DataMapStatusManager.java    |  30 +-
 .../blockletindex/BlockletDataMapFactory.java   |   4 +-
 .../schema/datamap/DataMapProperty.java         |  39 +++
 .../metadata/schema/table/DataMapSchema.java    |  16 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   6 +-
 .../datamap/bloom/BloomDataMapBuilder.java      |  91 ++++++
 .../datamap/bloom/BloomDataMapRefresher.java    |  91 ------
 .../examples/MinMaxIndexDataMapFactory.java     |  30 +-
 .../datamap/lucene/LuceneDataMapBuilder.java    | 224 +++++++++++++
 .../lucene/LuceneDataMapFactoryBase.java        |   9 +-
 .../datamap/lucene/LuceneDataMapRefresher.java  | 224 -------------
 .../hadoop/api/CarbonInputFormat.java           |  32 +-
 .../hadoop/api/CarbonOutputCommitter.java       |   2 +-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  68 ++--
 .../preaggregate/TestPreAggCreateCommand.scala  |  10 +
 .../testsuite/datamap/CGDataMapTestCase.scala   |   6 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   6 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   8 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  71 ++++-
 .../detailquery/SearchModeTestCase.scala        |   2 -
 .../TestInsertAndOtherCommandConcurrent.scala   |  59 ++--
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../carbondata/datamap/DataMapProperty.java     |  33 --
 .../datamap/IndexDataMapProvider.java           |   2 +-
 .../datamap/PreAggregateDataMapProvider.java    |  12 +-
 .../datamap/IndexDataMapRebuildRDD.scala        | 318 +++++++++++++++++++
 .../datamap/IndexDataMapRefreshRDD.scala        | 317 ------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  11 +
 .../datamap/CarbonCreateDataMapCommand.scala    |  29 +-
 .../datamap/CarbonDataMapRebuildCommand.scala   |  56 ++++
 .../datamap/CarbonDataMapRefreshCommand.scala   |  56 ----
 .../datasources/SparkCarbonFileFormat.scala     |   5 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  20 +-
 .../bloom/BloomCoarseGrainDataMapSuite.scala    | 187 +++++++++--
 .../datamap/DataMapWriterListener.java          |   8 +-
 .../store/worker/SearchRequestHandler.java      |  56 ++--
 .../scala/org/apache/spark/rpc/Master.scala     |  18 +-
 .../org/apache/spark/search/Searcher.scala      |   8 +-
 46 files changed, 1318 insertions(+), 1033 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 478254d..7cdabd6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.datamap.dev.expr.AndDataMapExprWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapperImpl;
 import org.apache.carbondata.core.datamap.dev.expr.OrDataMapExprWrapper;
+import org.apache.carbondata.core.datamap.status.DataMapStatusDetail;
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -56,34 +58,42 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 @InterfaceAudience.Internal
 public class DataMapChooser {
 
-  private static DataMapChooser INSTANCE;
+  private CarbonTable carbonTable;
+  private List<TableDataMap> cgDataMaps;
+  private List<TableDataMap> fgDataMaps;
 
-  private DataMapChooser() { }
-
-  public static DataMapChooser get() {
-    if (INSTANCE == null) {
-      INSTANCE = new DataMapChooser();
+  public DataMapChooser(CarbonTable carbonTable) throws IOException {
+    this.carbonTable = carbonTable;
+    // read all datamaps for this table and populate CG and FG datamap list
+    List<TableDataMap> visibleDataMaps =
+        DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable);
+    Map<String, DataMapStatusDetail> map = DataMapStatusManager.readDataMapStatusMap();
+    cgDataMaps = new ArrayList<>(visibleDataMaps.size());
+    fgDataMaps = new ArrayList<>(visibleDataMaps.size());
+    for (TableDataMap visibleDataMap : visibleDataMaps) {
+      DataMapStatusDetail status = map.get(visibleDataMap.getDataMapSchema().getDataMapName());
+      if (status != null && status.isEnabled()) {
+        DataMapLevel level = visibleDataMap.getDataMapFactory().getDataMapLevel();
+        if (level == DataMapLevel.CG) {
+          cgDataMaps.add(visibleDataMap);
+        } else {
+          fgDataMaps.add(visibleDataMap);
+        }
+      }
     }
-    return INSTANCE;
   }
 
   /**
    * Return a chosen datamap based on input filter. See {@link DataMapChooser}
    */
-  public DataMapExprWrapper choose(CarbonTable carbonTable, FilterResolverIntf filter)
-      throws IOException {
-    Objects.requireNonNull(carbonTable);
+  public DataMapExprWrapper choose(FilterResolverIntf filter) {
     if (filter != null) {
       Expression expression = filter.getFilterExpression();
       // First check for FG datamaps if any exist
-      List<TableDataMap> allDataMapFG =
-          DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, DataMapLevel.FG);
-      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG, filter);
+      ExpressionTuple tuple = selectDataMap(expression, fgDataMaps, filter);
       if (tuple.dataMapExprWrapper == null) {
         // Check for CG datamap
-        List<TableDataMap> allDataMapCG =
-            DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, DataMapLevel.CG);
-        tuple = selectDataMap(expression, allDataMapCG, filter);
+        tuple = selectDataMap(expression, cgDataMaps, filter);
       }
       if (tuple.dataMapExprWrapper != null) {
         return tuple.dataMapExprWrapper;
@@ -97,33 +107,22 @@ public class DataMapChooser {
   /**
    * Return a chosen FG datamap based on input filter. See {@link DataMapChooser}
    */
-  public DataMapExprWrapper chooseFGDataMap(CarbonTable carbonTable,
-      FilterResolverIntf resolverIntf) throws IOException {
-    if (resolverIntf != null) {
-      Expression expression = resolverIntf.getFilterExpression();
-      // First check for FG datamaps if any exist
-      List<TableDataMap> allDataMapFG =
-          DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, DataMapLevel.FG);
-      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG, resolverIntf);
-      if (tuple.dataMapExprWrapper != null) {
-        return tuple.dataMapExprWrapper;
-      }
-    }
-    // Return the default datamap if no other datamap exists.
-    return null;
+  public DataMapExprWrapper chooseFGDataMap(FilterResolverIntf resolverIntf) {
+    return chooseDataMap(DataMapLevel.FG, resolverIntf);
   }
 
   /**
    * Return a chosen CG datamap based on input filter. See {@link DataMapChooser}
    */
-  public DataMapExprWrapper chooseCGDataMap(CarbonTable carbonTable,
-      FilterResolverIntf resolverIntf) throws IOException {
+  public DataMapExprWrapper chooseCGDataMap(FilterResolverIntf resolverIntf) {
+    return chooseDataMap(DataMapLevel.CG, resolverIntf);
+  }
+
+  private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) {
     if (resolverIntf != null) {
       Expression expression = resolverIntf.getFilterExpression();
-      // Check for CG datamap
-      List<TableDataMap> allDataMapCG =
-          DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, DataMapLevel.CG);
-      ExpressionTuple tuple = selectDataMap(expression, allDataMapCG, resolverIntf);
+      List<TableDataMap> datamaps = level == DataMapLevel.CG ? cgDataMaps : fgDataMaps;
+      ExpressionTuple tuple = selectDataMap(expression, datamaps, resolverIntf);
       if (tuple.dataMapExprWrapper != null) {
         return tuple.dataMapExprWrapper;
       }
@@ -137,7 +136,7 @@ public class DataMapChooser {
    * @param resolverIntf
    * @return
    */
-  public DataMapExprWrapper getDefaultDataMap(CarbonTable carbonTable,
+  public static DataMapExprWrapper getDefaultDataMap(CarbonTable carbonTable,
       FilterResolverIntf resolverIntf) {
     // Return the default datamap if no other datamap exists.
     return new DataMapExprWrapperImpl(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index 775b912..05ba7cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -104,13 +104,12 @@ public abstract class DataMapProvider {
    * Rebuild the datamap by loading all existing data from mainTable
    * This is called when refreshing the datamap when
    * 1. after datamap creation and if `autoRefreshDataMap` is set to true
-   * 2. user manually trigger refresh datamap command
+   * 2. user manually trigger REBUILD DATAMAP command
    */
   public abstract void rebuild() throws IOException, NoSuchDataMapException;
 
   /**
    * Build the datamap incrementally by loading specified segment data
-   * This is called when user manually trigger refresh datamap
    */
   public abstract void incrementalBuild(String[] segmentIds) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 29a1106..9f7af2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -79,26 +79,9 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * It gives all visible datamaps of type @mapType except the default datamap.
-   */
-  public List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable, DataMapLevel mapType)
-      throws IOException {
-    List<TableDataMap> dataMaps = new ArrayList<>();
-    List<TableDataMap> tableIndices = getAllVisibleDataMap(carbonTable);
-    if (tableIndices != null) {
-      for (TableDataMap dataMap : tableIndices) {
-        if (mapType == dataMap.getDataMapFactory().getDataMapLevel()) {
-          dataMaps.add(dataMap);
-        }
-      }
-    }
-    return dataMaps;
-  }
-
-  /**
    * It only gives the visible datamaps
    */
-  private List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable) throws IOException {
+  List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable) throws IOException {
     CarbonSessionInfo sessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
     List<TableDataMap> allDataMaps = getAllDataMap(carbonTable);
     Iterator<TableDataMap> dataMapIterator = allDataMaps.iterator();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java
new file mode 100644
index 0000000..570a1ce
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * DataMapBuilder is used to implement REBUILD DATAMAP command, it reads all existing
+ * data in main table and load them into the DataMap. All existing index data will be deleted
+ * if there are existing data in the datamap.
+ */
+@InterfaceAudience.Developer("DataMap")
+public interface DataMapBuilder {
+  void initialize() throws IOException;
+
+  void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException;
+
+  void finish() throws IOException;
+
+  void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index ae34be7..ad709a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -61,10 +61,11 @@ public abstract class DataMapFactory<T extends DataMap> {
       throws IOException;
 
   /**
-   * Create a new Refresher for this datamap, to rebuild the specified
+   * Create a new DataMapBuilder for this datamap, to rebuild the specified
    * segment and shard data in the main table.
+   * TODO: refactor to unify with DataMapWriter
    */
-  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
+  public abstract DataMapBuilder createBuilder(Segment segment, String shardName)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
deleted file mode 100644
index 770ceca..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datamap.dev;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Interface to rebuild the datamap for main table with existing data
- */
-@InterfaceAudience.Developer("DataMap")
-public interface DataMapRefresher {
-  void initialize() throws IOException;
-
-  void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException;
-
-  void finish() throws IOException;
-
-  void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
index 1ecb1b1..d1e5921 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
@@ -51,6 +51,10 @@ public class DataMapStatusDetail implements Serializable {
     return status;
   }
 
+  public boolean isEnabled() {
+    return status == DataMapStatus.ENABLED;
+  }
+
   public void setStatus(DataMapStatus status) {
     this.status = status;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index dcad80b..b540146 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.core.datamap.status;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
@@ -51,6 +53,15 @@ public class DataMapStatusManager {
     return storageProvider.getDataMapStatusDetails();
   }
 
+  public static Map<String, DataMapStatusDetail> readDataMapStatusMap() throws IOException {
+    DataMapStatusDetail[] details = storageProvider.getDataMapStatusDetails();
+    Map<String, DataMapStatusDetail> map = new HashMap<>(details.length);
+    for (DataMapStatusDetail detail : details) {
+      map.put(detail.getDataMapName(), detail);
+    }
+    return map;
+  }
+
   public static void disableDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
     DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     if (dataMapSchema != null) {
@@ -60,10 +71,19 @@ public class DataMapStatusManager {
     }
   }
 
-  public static void disableDataMapsOfTable(CarbonTable table) throws IOException {
+  /**
+   * This method will disable all lazy (DEFERRED REBUILD) datamap in the given table
+   */
+  public static void disableAllLazyDataMaps(CarbonTable table) throws IOException {
     List<DataMapSchema> allDataMapSchemas =
         DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
-    storageProvider.updateDataMapStatus(allDataMapSchemas, DataMapStatus.DISABLED);
+    List<DataMapSchema> dataMapToBeDisabled = new ArrayList<>(allDataMapSchemas.size());
+    for (DataMapSchema dataMap : allDataMapSchemas) {
+      if (dataMap.isLazy()) {
+        dataMapToBeDisabled.add(dataMap);
+      }
+    }
+    storageProvider.updateDataMapStatus(dataMapToBeDisabled, DataMapStatus.DISABLED);
   }
 
   public static void enableDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
@@ -75,12 +95,6 @@ public class DataMapStatusManager {
     }
   }
 
-  public static void enableDataMapsOfTable(CarbonTable table) throws IOException {
-    List<DataMapSchema> allDataMapSchemas =
-        DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
-    storageProvider.updateDataMapStatus(allDataMapSchemas, DataMapStatus.ENABLED);
-  }
-
   public static void dropDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
     DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     if (dataMapSchema != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c3df721..e502251 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -91,7 +91,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public DataMapRefresher createRefresher(Segment segment, String shardName) {
+  public DataMapBuilder createBuilder(Segment segment, String shardName) {
     throw new UnsupportedOperationException("not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
new file mode 100644
index 0000000..9bd78da
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Property that can be specified when creating DataMap
+ */
+@InterfaceAudience.Internal
+public class DataMapProperty {
+
+  /**
+   * Used to specify the store location of the datamap
+   */
+  public static final String PARTITIONING = "partitioning";
+  public static final String PATH = "path";
+
+  /**
+   * For datamap created with 'WITH DEFERRED REBUILD' syntax, we will add this
+   * property internally
+   */
+  public static final String DEFERRED_REBUILD = "_internal.deferred.rebuild";
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index b22a3d4..611f298 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -28,6 +28,8 @@ import java.util.Objects;
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty;
+
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.INDEX_COLUMNS;
 
 import com.google.gson.Gson;
@@ -157,7 +159,16 @@ public class DataMapSchema implements Serializable, Writable {
     }
   }
 
-  @Override public void write(DataOutput out) throws IOException {
+  /**
+   * Return true if this datamap is lazy (created with DEFERRED REBUILD syntax)
+   */
+  public boolean isLazy() {
+    String deferredRebuild = getProperties().get(DataMapProperty.DEFERRED_REBUILD);
+    return deferredRebuild != null && deferredRebuild.equalsIgnoreCase("true");
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
     out.writeUTF(dataMapName);
     out.writeUTF(providerName);
     boolean isRelationIdentifierExists = null != relationIdentifier;
@@ -181,7 +192,8 @@ public class DataMapSchema implements Serializable, Writable {
     }
   }
 
-  @Override public void readFields(DataInput in) throws IOException {
+  @Override
+  public void readFields(DataInput in) throws IOException {
     this.dataMapName = in.readUTF();
     this.providerName = in.readUTF();
     boolean isRelationIdnentifierExists = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 1d6eab7..95c21fa 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -33,9 +33,9 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -173,8 +173,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
   }
 
   @Override
-  public DataMapRefresher createRefresher(Segment segment, String shardName) throws IOException {
-    return new BloomDataMapRefresher(getCarbonTable().getTablePath(), this.dataMapName,
+  public DataMapBuilder createBuilder(Segment segment, String shardName) throws IOException {
+    return new BloomDataMapBuilder(getCarbonTable().getTablePath(), this.dataMapName,
         this.dataMapMeta.getIndexedColumns(), segment, shardName,
         this.bloomFilterSize, this.bloomFilterFpp);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
new file mode 100644
index 0000000..fa1aef7
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapBuilder extends BloomDataMapWriter implements DataMapBuilder {
+
+  BloomDataMapBuilder(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
+      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp)
+      throws IOException {
+    super(tablePath, dataMapName, indexColumns, segment, shardName,
+        bloomFilterSize, bloomFilterFpp);
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    super.resetBloomFilters();
+  }
+
+  @Override
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values) {
+    if (currentBlockletId != blockletId) {
+      // new blocklet started, flush bloom filter to datamap fileh
+      super.writeBloomDataMapFile();
+      currentBlockletId = blockletId;
+    }
+    // for each indexed column, add the data to bloom filter
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int i = 0; i < indexColumns.size(); i++) {
+      Object data = values[i];
+      DataType dataType = indexColumns.get(i).getDataType();
+      byte[] indexValue;
+      if (DataTypes.STRING == dataType) {
+        indexValue = getStringData(data);
+      } else if (DataTypes.BYTE_ARRAY == dataType) {
+        byte[] originValue = (byte[]) data;
+        // String and byte array is LV encoded, L is short type
+        indexValue = new byte[originValue.length - 2];
+        System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
+      } else {
+        indexValue = CarbonUtil.getValueAsBytes(dataType, data);
+      }
+      indexBloomFilters.get(i).put(indexValue);
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+    super.finish();
+  }
+
+  @Override
+  public void close() throws IOException {
+    releaseResouce();
+  }
+
+  @Override
+  protected byte[] getStringData(Object data) {
+    return ((String) data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
deleted file mode 100644
index 8e05133..0000000
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.bloom;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
- */
-@InterfaceAudience.Internal
-public class BloomDataMapRefresher extends BloomDataMapWriter implements DataMapRefresher {
-
-  BloomDataMapRefresher(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp)
-      throws IOException {
-    super(tablePath, dataMapName, indexColumns, segment, shardName,
-        bloomFilterSize, bloomFilterFpp);
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    super.resetBloomFilters();
-  }
-
-  @Override
-  public void addRow(int blockletId, int pageId, int rowId, Object[] values) {
-    if (currentBlockletId != blockletId) {
-      // new blocklet started, flush bloom filter to datamap fileh
-      super.writeBloomDataMapFile();
-      currentBlockletId = blockletId;
-    }
-    // for each indexed column, add the data to bloom filter
-    List<CarbonColumn> indexColumns = getIndexColumns();
-    for (int i = 0; i < indexColumns.size(); i++) {
-      Object data = values[i];
-      DataType dataType = indexColumns.get(i).getDataType();
-      byte[] indexValue;
-      if (DataTypes.STRING == dataType) {
-        indexValue = getStringData(data);
-      } else if (DataTypes.BYTE_ARRAY == dataType) {
-        byte[] originValue = (byte[]) data;
-        // String and byte array is LV encoded, L is short type
-        indexValue = new byte[originValue.length - 2];
-        System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
-      } else {
-        indexValue = CarbonUtil.getValueAsBytes(dataType, data);
-      }
-      indexBloomFilters.get(i).put(indexValue);
-    }
-  }
-
-  @Override
-  public void finish() throws IOException {
-    super.finish();
-  }
-
-  @Override
-  public void close() throws IOException {
-    releaseResouce();
-  }
-
-  @Override
-  protected byte[] getStringData(Object data) {
-    return ((String) data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 4197b79..84b9e65 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -21,14 +21,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -42,8 +41,6 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.Transformer;
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -52,25 +49,18 @@ import org.apache.commons.lang3.StringUtils;
 public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   private static final LogService LOGGER = LogServiceFactory.getLogService(
       MinMaxIndexDataMapFactory.class.getName());
-  private DataMapSchema dataMapSchema;
   private DataMapMeta dataMapMeta;
   private String dataMapName;
   private AbsoluteTableIdentifier identifier;
 
-  public MinMaxIndexDataMapFactory(CarbonTable carbonTable) {
-    super(carbonTable);
-  }
+  public MinMaxIndexDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    super(carbonTable, dataMapSchema);
 
-  // this is an example for datamap, we can choose the columns and operations that
-  // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
-  @Override public void init(DataMapSchema dataMapSchema)
-      throws IOException, MalformedDataMapCommandException {
-    this.dataMapSchema = dataMapSchema;
-    this.identifier = carbonTable.getAbsoluteTableIdentifier();
-    this.dataMapName = dataMapSchema.getDataMapName();
+    // this is an example for datamap, we can choose the columns and operations that
+    // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
 
     // columns that will be indexed
-    List<CarbonColumn> allColumns = carbonTable.getCreateOrderColumn(identifier.getTableName());
+    List<CarbonColumn> allColumns = getCarbonTable().getCreateOrderColumn(identifier.getTableName());
 
     // operations that will be supported on the indexed columns
     List<ExpressionType> optOperations = new ArrayList<>();
@@ -91,12 +81,14 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
    * @param shardName
    * @return
    */
-  @Override public DataMapWriter createWriter(Segment segment, String shardName) {
-    return new MinMaxDataWriter(carbonTable, dataMapSchema, segment, shardName,
+  @Override
+  public DataMapWriter createWriter(Segment segment, String shardName) {
+    return new MinMaxDataWriter(getCarbonTable(), getDataMapSchema(), segment, shardName,
         dataMapMeta.getIndexedColumns());
   }
 
-  @Override public DataMapRefresher createRefresher(Segment segment, String shardName)
+  @Override
+  public DataMapBuilder createBuilder(Segment segment, String shardName)
       throws IOException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
new file mode 100644
index 0000000..35c07f0
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
+import org.apache.lucene.codecs.lucene62.Lucene62Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.IntRangeField;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+public class LuceneDataMapBuilder implements DataMapBuilder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
+
+  private String dataMapPath;
+
+  private List<CarbonColumn> indexColumns;
+
+  private int columnsCount;
+
+  private IndexWriter indexWriter = null;
+
+  private IndexWriter pageIndexWriter = null;
+
+  private Analyzer analyzer = null;
+
+  LuceneDataMapBuilder(String tablePath, String dataMapName,
+      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
+    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
+        tablePath, segment.getSegmentNo(), dataMapName, shardName);
+    this.indexColumns = indexColumns;
+    this.columnsCount = indexColumns.size();
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    // get index path, put index data into segment's path
+    Path indexPath = FileFactory.getPath(dataMapPath);
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // if index path exists, should delete it because we are
+    // rebuilding the whole datamap for all segments
+    if (fs.exists(indexPath)) {
+      fs.delete(indexPath, true);
+    }
+    if (!fs.mkdirs(indexPath)) {
+      LOGGER.error("Failed to create directory " + indexPath);
+    }
+
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // create a index writer
+    Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) {
+      indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+    }
+
+    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+  }
+
+  private IndexWriter createPageIndexWriter() throws IOException {
+    // save index data into ram, write into disk after one page finished
+    RAMDirectory ramDir = new RAMDirectory();
+    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+  }
+
+  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
+
+    Directory directory = pageIndexWriter.getDirectory();
+
+    // close ram writer
+    pageIndexWriter.close();
+
+    // add ram index data into disk
+    indexWriter.addIndexes(directory);
+
+    // delete this ram data
+    directory.close();
+  }
+
+  @Override
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException {
+    if (rowId == 0) {
+      if (pageIndexWriter != null) {
+        addPageIndex(pageIndexWriter);
+      }
+      pageIndexWriter = createPageIndexWriter();
+    }
+
+    // create a new document
+    Document doc = new Document();
+
+    // add blocklet Id
+    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
+    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
+
+    // add page id
+    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
+    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
+
+    // add row id
+    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
+    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
+
+    // add other fields
+    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
+      CarbonColumn column = indexColumns.get(colIdx);
+      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
+    }
+
+    pageIndexWriter.addDocument(doc);
+  }
+
+  private boolean addField(Document doc, String fieldName, DataType type, Object value) {
+    if (type == DataTypes.STRING) {
+      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
+    } else if (type == DataTypes.BYTE) {
+      // byte type , use int range to deal with byte, lucene has no byte type
+      IntRangeField field =
+          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
+      field.setIntValue((int) value);
+      doc.add(field);
+    } else if (type == DataTypes.SHORT) {
+      // short type , use int range to deal with short type, lucene has no short type
+      IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
+          new int[] { Short.MAX_VALUE });
+      field.setShortValue((short) value);
+      doc.add(field);
+    } else if (type == DataTypes.INT) {
+      // int type , use int point to deal with int type
+      doc.add(new IntPoint(fieldName, (int) value));
+    } else if (type == DataTypes.LONG) {
+      // long type , use long point to deal with long type
+      doc.add(new LongPoint(fieldName, (long) value));
+    } else if (type == DataTypes.FLOAT) {
+      doc.add(new FloatPoint(fieldName, (float) value));
+    } else if (type == DataTypes.DOUBLE) {
+      doc.add(new DoublePoint(fieldName, (double) value));
+    } else if (type == DataTypes.DATE) {
+      // TODO: how to get data value
+    } else if (type == DataTypes.TIMESTAMP) {
+      // TODO: how to get
+    } else if (type == DataTypes.BOOLEAN) {
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
+      field.setIntValue((boolean) value ? 1 : 0);
+      doc.add(field);
+    } else {
+      LOGGER.error("unsupport data type " + type);
+      throw new RuntimeException("unsupported data type " + type);
+    }
+    return true;
+  }
+
+  @Override
+  public void finish() throws IOException {
+    if (indexWriter != null && pageIndexWriter != null) {
+      addPageIndex(pageIndexWriter);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (indexWriter != null) {
+      indexWriter.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index d52cef9..4c6aec3 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -153,15 +153,16 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
   }
 
   @Override
-  public DataMapRefresher createRefresher(Segment segment, String shardName) {
-    return new LuceneDataMapRefresher(getCarbonTable().getTablePath(), dataMapName,
+  public DataMapBuilder createBuilder(Segment segment, String shardName) {
+    return new LuceneDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
         segment, shardName, dataMapMeta.getIndexedColumns());
   }
 
   /**
    * Get all distributable objects of a segmentid
    */
-  @Override public List<DataMapDistributable> toDistributable(Segment segment) {
+  @Override
+  public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> lstDataMapDistribute = new ArrayList<>();
     CarbonFile[] indexDirs =
         getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
deleted file mode 100644
index ee500ef..0000000
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
-import org.apache.lucene.codecs.lucene62.Lucene62Codec;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoublePoint;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FloatPoint;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.IntRangeField;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-import org.apache.solr.store.hdfs.HdfsDirectory;
-
-public class LuceneDataMapRefresher implements DataMapRefresher {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
-
-  private String dataMapPath;
-
-  private List<CarbonColumn> indexColumns;
-
-  private int columnsCount;
-
-  private IndexWriter indexWriter = null;
-
-  private IndexWriter pageIndexWriter = null;
-
-  private Analyzer analyzer = null;
-
-  LuceneDataMapRefresher(String tablePath, String dataMapName,
-      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
-    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
-        tablePath, segment.getSegmentNo(), dataMapName, shardName);
-    this.indexColumns = indexColumns;
-    this.columnsCount = indexColumns.size();
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    // get index path, put index data into segment's path
-    Path indexPath = FileFactory.getPath(dataMapPath);
-    FileSystem fs = FileFactory.getFileSystem(indexPath);
-
-    // if index path exists, should delete it because we are
-    // rebuilding the whole datamap for all segments
-    if (fs.exists(indexPath)) {
-      fs.delete(indexPath, true);
-    }
-    if (!fs.mkdirs(indexPath)) {
-      LOGGER.error("Failed to create directory " + indexPath);
-    }
-
-    if (null == analyzer) {
-      analyzer = new StandardAnalyzer();
-    }
-
-    // create a index writer
-    Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
-
-    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
-    if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
-            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
-        .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) {
-      indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
-    } else {
-      indexWriterConfig
-          .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
-    }
-
-    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
-  }
-
-  private IndexWriter createPageIndexWriter() throws IOException {
-    // save index data into ram, write into disk after one page finished
-    RAMDirectory ramDir = new RAMDirectory();
-    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
-  }
-
-  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
-
-    Directory directory = pageIndexWriter.getDirectory();
-
-    // close ram writer
-    pageIndexWriter.close();
-
-    // add ram index data into disk
-    indexWriter.addIndexes(directory);
-
-    // delete this ram data
-    directory.close();
-  }
-
-  @Override
-  public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException {
-    if (rowId == 0) {
-      if (pageIndexWriter != null) {
-        addPageIndex(pageIndexWriter);
-      }
-      pageIndexWriter = createPageIndexWriter();
-    }
-
-    // create a new document
-    Document doc = new Document();
-
-    // add blocklet Id
-    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
-    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
-
-    // add page id
-    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
-    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
-
-    // add row id
-    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
-    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
-
-    // add other fields
-    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
-      CarbonColumn column = indexColumns.get(colIdx);
-      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
-    }
-
-    pageIndexWriter.addDocument(doc);
-  }
-
-  private boolean addField(Document doc, String fieldName, DataType type, Object value) {
-    if (type == DataTypes.STRING) {
-      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
-    } else if (type == DataTypes.BYTE) {
-      // byte type , use int range to deal with byte, lucene has no byte type
-      IntRangeField field =
-          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
-      field.setIntValue((int) value);
-      doc.add(field);
-    } else if (type == DataTypes.SHORT) {
-      // short type , use int range to deal with short type, lucene has no short type
-      IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
-          new int[] { Short.MAX_VALUE });
-      field.setShortValue((short) value);
-      doc.add(field);
-    } else if (type == DataTypes.INT) {
-      // int type , use int point to deal with int type
-      doc.add(new IntPoint(fieldName, (int) value));
-    } else if (type == DataTypes.LONG) {
-      // long type , use long point to deal with long type
-      doc.add(new LongPoint(fieldName, (long) value));
-    } else if (type == DataTypes.FLOAT) {
-      doc.add(new FloatPoint(fieldName, (float) value));
-    } else if (type == DataTypes.DOUBLE) {
-      doc.add(new DoublePoint(fieldName, (double) value));
-    } else if (type == DataTypes.DATE) {
-      // TODO: how to get data value
-    } else if (type == DataTypes.TIMESTAMP) {
-      // TODO: how to get
-    } else if (type == DataTypes.BOOLEAN) {
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
-      field.setIntValue((boolean) value ? 1 : 0);
-      doc.add(field);
-    } else {
-      LOGGER.error("unsupport data type " + type);
-      throw new RuntimeException("unsupported data type " + type);
-    }
-    return true;
-  }
-
-  @Override
-  public void finish() throws IOException {
-    if (indexWriter != null && pageIndexWriter != null) {
-      addPageIndex(pageIndexWriter);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (indexWriter != null) {
-      indexWriter.close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index a5d4df2..c5365d5 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -420,7 +419,7 @@ m filterExpression
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     // First prune using default datamap on driver side.
-    DataMapExprWrapper dataMapExprWrapper = DataMapChooser.get()
+    DataMapExprWrapper dataMapExprWrapper = DataMapChooser
         .getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
     List<ExtendedBlocklet> prunedBlocklets =
         dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
@@ -428,9 +427,10 @@ m filterExpression
     ExplainCollector.recordDefaultDataMapPruning(
         dataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
 
+    DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
+
     // Get the available CG datamaps and prune further.
-    DataMapExprWrapper cgDataMapExprWrapper = DataMapChooser.get()
-        .chooseCGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+    DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(resolver);
     if (cgDataMapExprWrapper != null) {
       // Prune segments from already pruned blocklets
       pruneSegments(segmentIds, prunedBlocklets);
@@ -447,19 +447,19 @@ m filterExpression
           cgDataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
     }
     // Now try to prune with FG DataMap.
-    dataMapExprWrapper = DataMapChooser.get()
-        .chooseFGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
-    if (dataMapExprWrapper != null && dataMapExprWrapper.getDataMapLevel() == DataMapLevel.FG
-        && isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
-      // Prune segments from already pruned blocklets
-      pruneSegments(segmentIds, prunedBlocklets);
-      prunedBlocklets =
-          executeDataMapJob(carbonTable, resolver, segmentIds, dataMapExprWrapper, dataMapJob,
-              partitionsToPrune);
+    if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
+      DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(resolver);
+      if (fgDataMapExprWrapper != null) {
+        // Prune segments from already pruned blocklets
+        pruneSegments(segmentIds, prunedBlocklets);
+        prunedBlocklets =
+            executeDataMapJob(carbonTable, resolver, segmentIds, fgDataMapExprWrapper, dataMapJob,
+                partitionsToPrune);
 
-      ExplainCollector.recordFGDataMapPruning(
-          dataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
-    }
+        ExplainCollector.recordFGDataMapPruning(
+            fgDataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
+      }
+    } // TODO: add a else branch to push FGDataMap pruning to reader side
     return prunedBlocklets;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 2851de2..0bcb188 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -168,7 +168,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       } else {
         CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
       }
-      DataMapStatusManager.disableDataMapsOfTable(carbonTable);
+      DataMapStatusManager.disableAllLazyDataMaps(carbonTable);
       if (operationContext != null) {
         LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
             new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 406ee41..9981ff5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandExcept
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 
 class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
 
@@ -62,14 +62,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql("DROP TABLE IF EXISTS datamap_test4")
-
-    sql(
-      """
-        | CREATE TABLE datamap_test4(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'carbondata'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 'autorefreshdatamap' = 'false')
-      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
   }
 
   test("validate INDEX_COLUMNS DataMap property") {
@@ -132,38 +125,38 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='Name , cIty')
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
     checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
 
     sql("drop datamap dm on table datamap_test")
   }
 
-  test("test lucene refresh data map") {
-
+  test("test lucene rebuild data map") {
+    sql("DROP TABLE IF EXISTS datamap_test4")
+    sql(
+      """
+        | CREATE TABLE datamap_test4(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 'autorefreshdatamap' = 'false')
+      """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 OPTIONS('header'='false')")
 
-    //sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 OPTIONS('header'='false')")
-
     sql(
       s"""
          | CREATE DATAMAP dm4 ON TABLE datamap_test4
          | USING 'lucene'
-         | DMProperties('INDEX_COLUMNS'='Name , cIty')
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    //sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 OPTIONS('header'='false')")
+    sql("REBUILD DATAMAP dm4 ON TABLE datamap_test4")
 
-    sql("refresh datamap dm4 ON TABLE datamap_test4")
-
-    checkAnswer(sql("SELECT * FROM datamap_test4 WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test4 where name='n10'"))
+    checkAnswer(sql("SELECT * FROM datamap_test4 WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test4 WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test4 WHERE city='c020'"))
 
-    sql("drop datamap dm4 on table datamap_test4")
-
+    sql("drop table datamap_test4")
   }
 
-
   test("test lucene fine grain data map drop") {
     sql("DROP TABLE IF EXISTS datamap_test1")
     sql(
@@ -549,6 +542,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test lucene fine grain data map with text-match limit") {
+
     sql(
       s"""
          | CREATE DATAMAP dm ON TABLE datamap_test
@@ -556,7 +550,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
     checkAnswer(sql("select count(*) from datamap_test where TEXT_MATCH_WITH_LIMIT('name:n10*',10)"),Seq(Row(10)))
     checkAnswer(sql("select count(*) from datamap_test where TEXT_MATCH_WITH_LIMIT('name:n10*',50)"),Seq(Row(50)))
     sql("drop datamap dm on table datamap_test")
@@ -570,17 +563,16 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
-    sql("DROP TABLE IF EXISTS tabl1")
     sql(
       """
         | CREATE TABLE table1(id INT, name STRING, city STRING, age INT)
         | STORED BY 'carbondata'
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
-    sql("INSERT OVERWRITE TABLE table1 select *from datamap_test where TEXT_MATCH('name:n*')")
+    sql("INSERT OVERWRITE TABLE table1 select * from datamap_test where TEXT_MATCH('name:n*')")
     checkAnswer(sql("select count(*) from table1"),Seq(Row(10000)))
     sql("drop datamap dm on table datamap_test")
+    sql("drop table table1")
   }
 
   test("explain query with lucene datamap") {
@@ -698,7 +690,31 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       sql(s"select * from datamap_test5 where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
       sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
+  }
 
+  test("test lucene fine grain datamap rebuild") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test5
+         | USING 'lucene'
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    val map = DataMapStatusManager.readDataMapStatusMap()
+    assert(!map.get("dm").isEnabled)
+    sql("REBUILD DATAMAP dm ON TABLE datamap_test5")
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
   }
 
   test("test text_match on normal table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 629e9d9..fb25141 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -436,6 +436,16 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test pre agg datamap with deferred rebuild") {
+    val e = intercept[MalformedDataMapCommandException] {
+      sql("create datamap failure on table PreAggMain1 " +
+          "using 'preaggregate' " +
+          "with deferred rebuild " +
+          "as select a as a1,sum(b) as sum from PreAggMain1 group by a")
+    }
+    assert(e.getMessage.contains("DEFERRED REBUILD is not supported on this DataMap"))
+  }
+
   // TODO: Need to Fix
   ignore("test creation of multiple preaggregate of same name concurrently") {
     sql("DROP TABLE IF EXISTS tbl_concurr")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b441bb4..d8fc46f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapRefresher, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -147,8 +147,8 @@ class CGDataMapFactory(
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 5e0c10a..ffbcf67 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.features.TableOperation
@@ -83,8 +83,8 @@ class C2DataMapFactory(
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 976e580..535a112 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -27,7 +27,9 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
 import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapRefresher, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -141,8 +143,8 @@ class FGDataMapFactory(carbonTable: CarbonTable,
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }