You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 18:22:22 UTC

[34/36] carbondata git commit: [CARBONDATA-1544][Datamap] Datamap FineGrain implementation

[CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Implemented interfaces for FG datamap and integrated to filterscanner to use the pruned bitset from FG datamap.
FG Query flow as follows.
1.The user can add FG datamap to any table and implement there interfaces.
2. Any filter query which hits the table with datamap will call prune method of FGdatamap.
3. The prune method of FGDatamap return list FineGrainBlocklet , these blocklets contain the information of block, blocklet, page and rowids information as well.
4. The pruned blocklets are internally wriitten to file and returns only the block , blocklet and filepath information as part of Splits.
5. Based on the splits scanrdd schedule the tasks.
6. In filterscanner we check the datamapwriterpath from split and reNoteads the bitset if exists. And pass this bitset as input to it.

This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dc43a46a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dc43a46a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dc43a46a

Branch: refs/heads/carbonstore-rebase
Commit: dc43a46aa85376decabf88b9089330c37dad34aa
Parents: fce7558
Author: ravipesala <ra...@gmail.com>
Authored: Wed Nov 15 19:48:40 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Feb 10 02:20:11 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapMeta.java    |   8 +-
 .../core/datamap/DataMapStoreManager.java       |  30 +-
 .../carbondata/core/datamap/DataMapType.java    |  21 +
 .../carbondata/core/datamap/TableDataMap.java   |  30 +-
 .../core/datamap/dev/AbstractDataMapWriter.java | 110 +++++
 .../core/datamap/dev/BlockletSerializer.java    |  57 +++
 .../carbondata/core/datamap/dev/DataMap.java    |   4 +-
 .../core/datamap/dev/DataMapFactory.java        |  14 +-
 .../core/datamap/dev/DataMapWriter.java         |  57 ---
 .../cgdatamap/AbstractCoarseGrainDataMap.java   |  24 +
 .../AbstractCoarseGrainDataMapFactory.java      |  34 ++
 .../dev/fgdatamap/AbstractFineGrainDataMap.java |  24 +
 .../AbstractFineGrainDataMapFactory.java        |  38 ++
 .../carbondata/core/datastore/DataRefNode.java  |   6 +
 .../core/datastore/block/TableBlockInfo.java    |  10 +
 .../impl/btree/AbstractBTreeLeafNode.java       |   5 +
 .../datastore/impl/btree/BTreeNonLeafNode.java  |   5 +
 .../carbondata/core/indexstore/Blocklet.java    |  30 +-
 .../indexstore/BlockletDataMapIndexStore.java   |   6 -
 .../core/indexstore/BlockletDetailsFetcher.java |   8 +
 .../core/indexstore/ExtendedBlocklet.java       |  17 +
 .../core/indexstore/FineGrainBlocklet.java      | 120 +++++
 .../blockletindex/BlockletDataMap.java          |  15 +-
 .../blockletindex/BlockletDataMapFactory.java   |  63 ++-
 .../blockletindex/BlockletDataRefNode.java      |  27 +-
 .../indexstore/blockletindex/IndexWrapper.java  |  18 +
 .../core/indexstore/schema/FilterType.java      |  24 -
 .../executer/ExcludeFilterExecuterImpl.java     |   3 +
 .../executer/IncludeFilterExecuterImpl.java     |   3 +
 .../scanner/impl/BlockletFilterScanner.java     |   2 +
 .../apache/carbondata/core/util/CarbonUtil.java |  98 +++++
 .../datamap/examples/MinMaxDataMap.java         |  32 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |  49 ++-
 .../datamap/examples/MinMaxDataWriter.java      |  36 +-
 .../examples/MinMaxIndexBlockDetails.java       |  13 -
 .../carbondata/hadoop/CarbonInputSplit.java     |  21 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  17 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   | 361 +++++++++++++++
 .../testsuite/datamap/DataMapWriterSuite.scala  |  46 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   | 440 +++++++++++++++++++
 .../TestInsertAndOtherCommandConcurrent.scala   |  21 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   7 +-
 .../TestStreamingTableOperation.scala           |   5 +-
 .../datamap/DataMapWriterListener.java          |  57 ++-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../store/writer/AbstractFactDataWriter.java    | 128 +-----
 46 files changed, 1764 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index 7746acf..dd15ccb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -19,15 +19,15 @@ package org.apache.carbondata.core.datamap;
 
 import java.util.List;
 
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 
 public class DataMapMeta {
 
   private List<String> indexedColumns;
 
-  private FilterType optimizedOperation;
+  private List<ExpressionType> optimizedOperation;
 
-  public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+  public DataMapMeta(List<String> indexedColumns, List<ExpressionType> optimizedOperation) {
     this.indexedColumns = indexedColumns;
     this.optimizedOperation = optimizedOperation;
   }
@@ -36,7 +36,7 @@ public class DataMapMeta {
     return indexedColumns;
   }
 
-  public FilterType getOptimizedOperation() {
+  public List<ExpressionType> getOptimizedOperation() {
     return optimizedOperation;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 90e5fff..8d80b4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -56,7 +56,22 @@ public final class DataMapStoreManager {
   }
 
   public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
-    return allDataMaps.get(identifier.uniqueName());
+    return allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+  }
+
+  // TODO its a temporary method till chooser is implemented
+  public TableDataMap chooseDataMap(AbsoluteTableIdentifier identifier) {
+    List<TableDataMap> tableDataMaps = getAllDataMap(identifier);
+    if (tableDataMaps != null && tableDataMaps.size() > 0) {
+      for (TableDataMap dataMap: tableDataMaps) {
+        if (!dataMap.getDataMapName().equalsIgnoreCase(BlockletDataMap.NAME)) {
+          return dataMap;
+        }
+      }
+      return tableDataMaps.get(0);
+    } else {
+      return getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    }
   }
 
   /**
@@ -68,7 +83,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
       String dataMapName, String factoryClass) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     TableDataMap dataMap;
     if (tableDataMaps == null) {
@@ -96,7 +111,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
       String factoryClassName, String dataMapName) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
@@ -149,7 +164,9 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(tableUniqueName);
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap: tableDataMaps) {
@@ -158,7 +175,7 @@ public final class DataMapStoreManager {
           break;
         }
       }
-      allDataMaps.remove(identifier.uniqueName());
+      allDataMaps.remove(tableUniqueName);
     }
   }
 
@@ -167,7 +184,8 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
     if (tableDataMaps != null) {
       int i = 0;
       for (TableDataMap tableDataMap: tableDataMaps) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
new file mode 100644
index 0000000..bf812b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+public enum DataMapType {
+  CG,FG;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 1c80703..42fc702 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -76,10 +79,15 @@ public final class TableDataMap extends OperationEventListener {
     SegmentProperties segmentProperties;
     for (String segmentId : segmentIds) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
-      segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
-      for (DataMap dataMap : dataMaps) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+      // if filter is not passed then return all the blocklets
+      if (filterExp == null) {
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segmentId, partitions);
+      } else {
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
+        for (DataMap dataMap : dataMaps) {
+          pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+        }
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
           .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
@@ -137,9 +145,21 @@ public final class TableDataMap extends OperationEventListener {
               segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()),
               partitions));
     }
-    for (Blocklet blocklet: blocklets) {
+    BlockletSerializer serializer = new BlockletSerializer();
+    String writePath =
+        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
+    if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+      FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
+    }
+    for (Blocklet blocklet : blocklets) {
       ExtendedBlocklet detailedBlocklet =
           blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId());
+      if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+        String blockletwritePath =
+            writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
+        detailedBlocklet.setDataMapWriterPath(blockletwritePath);
+        serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
+      }
       detailedBlocklet.setSegmentId(distributable.getSegmentId());
       detailedBlocklets.add(detailedBlocklet);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
new file mode 100644
index 0000000..bcc9bad
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Data Map writer
+ */
+public abstract class AbstractDataMapWriter {
+
+  protected AbsoluteTableIdentifier identifier;
+
+  protected String segmentId;
+
+  protected String writeDirectoryPath;
+
+  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String writeDirectoryPath) {
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.writeDirectoryPath = writeDirectoryPath;
+  }
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  public abstract void onBlockStart(String blockId);
+
+  /**
+   * End of block notification
+   */
+  public abstract void onBlockEnd(String blockId);
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletStart(int blockletId);
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletEnd(int blockletId);
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  public abstract void finish() throws IOException;
+
+  /**
+   * It copies the file from temp folder to actual folder
+   *
+   * @param dataMapFile
+   * @throws IOException
+   */
+  protected void commitFile(String dataMapFile) throws IOException {
+    if (!dataMapFile.startsWith(writeDirectoryPath)) {
+      throw new UnsupportedOperationException(
+          "Datamap file " + dataMapFile + " is not written in provided directory path "
+              + writeDirectoryPath);
+    }
+    String dataMapFileName =
+        dataMapFile.substring(writeDirectoryPath.length(), dataMapFile.length());
+    String carbonFilePath = dataMapFileName.substring(0, dataMapFileName.lastIndexOf("/"));
+    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+    if (carbonFilePath.length() > 0) {
+      carbonFilePath = segmentPath + carbonFilePath;
+      FileFactory.mkdirs(carbonFilePath, FileFactory.getFileType(carbonFilePath));
+    } else {
+      carbonFilePath = segmentPath;
+    }
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(dataMapFile, carbonFilePath, 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
new file mode 100644
index 0000000..3d4c717
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public class BlockletSerializer {
+
+  /**
+   * Serialize and write blocklet to the file.
+   * @param grainBlocklet
+   * @param writePath
+   * @throws IOException
+   */
+  public void serializeBlocklet(FineGrainBlocklet grainBlocklet, String writePath)
+      throws IOException {
+    DataOutputStream dataOutputStream =
+        FileFactory.getDataOutputStream(writePath, FileFactory.getFileType(writePath));
+    grainBlocklet.write(dataOutputStream);
+    dataOutputStream.close();
+  }
+
+  /**
+   * Read data from filepath and deserialize blocklet.
+   * @param writePath
+   * @return
+   * @throws IOException
+   */
+  public FineGrainBlocklet deserializeBlocklet(String writePath) throws IOException {
+    DataInputStream inputStream =
+        FileFactory.getDataInputStream(writePath, FileFactory.getFileType(writePath));
+    FineGrainBlocklet blocklet = new FineGrainBlocklet();
+    blocklet.readFields(inputStream);
+    inputStream.close();
+    return blocklet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index dfe97e3..3fa6d75 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 /**
  * Datamap is an entity which can store and retrieve index data.
  */
-public interface DataMap {
+public interface DataMap<T extends Blocklet> {
 
   /**
    * It is called to load the data map to memory or to initialize it.
@@ -41,7 +41,7 @@ public interface DataMap {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+  List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<String> partitions);
 
   // TODO Move this method to Abstract class

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f5a7404..e900f8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,13 +21,14 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 
 /**
  * Interface for datamap factory, it is responsible for creating the datamap.
  */
-public interface DataMapFactory {
+public interface DataMapFactory<T extends DataMap> {
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
@@ -37,17 +38,17 @@ public interface DataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  DataMapWriter createWriter(String segmentId);
+  AbstractDataMapWriter createWriter(String segmentId, String writeDirectoryPath);
 
   /**
    * Get the datamap for segmentid
    */
-  List<DataMap> getDataMaps(String segmentId) throws IOException;
+  List<T> getDataMaps(String segmentId) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException;
+  List<T> getDataMaps(DataMapDistributable distributable) throws IOException;
 
   /**
    * Get all distributable objects of a segmentid
@@ -75,4 +76,9 @@ public interface DataMapFactory {
    * Return metadata of this datamap
    */
   DataMapMeta getMeta();
+
+  /**
+   *  Type of datamap whether it is FG or CG
+   */
+  DataMapType getDataMapType();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
deleted file mode 100644
index 413eaa5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datamap.dev;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter {
-
-  /**
-   *  Start of new block notification.
-   *  @param blockId file name of the carbondata file
-   */
-  void onBlockStart(String blockId, String blockPath);
-
-  /**
-   * End of block notification
-   */
-  void onBlockEnd(String blockId);
-
-  /**
-   * Start of new blocklet notification.
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletStart(int blockletId);
-
-  /**
-   * End of blocklet notification
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletEnd(int blockletId);
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in DataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
new file mode 100644
index 0000000..d79d0c6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.Blocklet;
+
+public abstract class AbstractCoarseGrainDataMap implements DataMap<Blocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..9789992
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune method of CGdatamap.
+ *  2. The prune method of CGDatamap return list Blocklet , these blocklets contain the
+ *     information of block and blocklet.
+ *  3. Based on the splits scanrdd schedule the tasks.
+ */
+public abstract class AbstractCoarseGrainDataMapFactory
+    implements DataMapFactory<AbstractCoarseGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.CG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
new file mode 100644
index 0000000..310fb3b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public abstract class AbstractFineGrainDataMap implements DataMap<FineGrainBlocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
new file mode 100644
index 0000000..1ca7fc3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune method of FGdatamap.
+ *  2. The prune method of FGDatamap return list FineGrainBlocklet , these blocklets contain the
+ *     information of block, blocklet, page and rowids information as well.
+ *  3. The pruned blocklets are internally wriitten to file and returns only the block ,
+ *    blocklet and filepath information as part of Splits.
+ *  4. Based on the splits scanrdd schedule the tasks.
+ *  5. In filterscanner we check the datamapwriterpath from split and reNoteads the
+ *     bitset if exists. And pass this bitset as input to it.
+ */
+public abstract class AbstractFineGrainDataMapFactory
+    implements DataMapFactory<AbstractFineGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.FG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index 273f833..df0896a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Interface data block reference
@@ -119,4 +120,9 @@ public interface DataRefNode {
    */
   MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex) throws IOException;
 
+  /**
+   * Return the indexed data if it has any from disk which was stored by FG datamap.
+   * @return
+   */
+  BitSetGroup getIndexedData();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index b27b5fc..a7bfdba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -96,6 +96,8 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private BlockletDetailInfo detailInfo;
 
+  private String dataMapWriterPath;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
       String[] locations, long blockLength, ColumnarFormatVersion version,
       String[] deletedDeltaFilePath) {
@@ -424,4 +426,12 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setDataBlockFromOldStore(boolean dataBlockFromOldStore) {
     isDataBlockFromOldStore = dataBlockFromOldStore;
   }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
index fe4cf83..f5a751b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Non leaf node abstract class
@@ -222,4 +223,8 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
   public int getPageRowCount(int pageNumber) {
     throw new UnsupportedOperationException("Unsupported operation");
   }
+
+  @Override public BitSetGroup getIndexedData() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
index c200f8d..a6eb695 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * No leaf node of a b+tree class which will keep the matadata(start key) of the
@@ -227,6 +228,10 @@ public class BTreeNonLeafNode implements BTreeNode {
     throw new UnsupportedOperationException("Unsupported operation");
   }
 
+  public BitSetGroup getIndexedData() {
+    return null;
+  }
+
   /**
    * number of pages in blocklet
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index d84f3f6..c731e07 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,28 +16,46 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
 /**
  * Blocklet
  */
-public class Blocklet implements Serializable {
+public class Blocklet implements Writable,Serializable {
 
-  private String path;
+  private String blockId;
 
   private String blockletId;
 
-  public Blocklet(String path, String blockletId) {
-    this.path = path;
+  public Blocklet(String blockId, String blockletId) {
+    this.blockId = blockId;
     this.blockletId = blockletId;
   }
 
-  public String getPath() {
-    return path;
+  // For serialization purpose
+  public Blocklet() {
   }
 
   public String getBlockletId() {
     return blockletId;
   }
 
+  public String getBlockId() {
+    return blockId;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeUTF(blockId);
+    out.writeUTF(blockletId);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    blockId = in.readUTF();
+    blockletId = in.readUTF();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 111a7a2..7598961 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -104,7 +103,6 @@ public class BlockletDataMapIndexStore
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
     List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
-    ExecutorService service = null;
     // Get the datamaps for each indexfile from cache.
     try {
       for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
@@ -151,10 +149,6 @@ public class BlockletDataMapIndexStore
         dataMap.clear();
       }
       throw new IOException("Problem in loading segment blocks.", e);
-    } finally {
-      if (service != null) {
-        service.shutdownNow();
-      }
     }
     return blockletDataMaps;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 21ecba1..3ed826a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -44,4 +44,12 @@ public interface BlockletDetailsFetcher {
    * @throws IOException
    */
   ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException;
+
+  /**
+   * Get all the blocklets in a segment
+   *
+   * @param segmentId
+   * @return
+   */
+  List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index d1bfa35..58a9344 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -29,8 +29,13 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String[] location;
 
+  private String path;
+
+  private String dataMapWriterPath;
+
   public ExtendedBlocklet(String path, String blockletId) {
     super(path, blockletId);
+    this.path = path;
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -60,4 +65,16 @@ public class ExtendedBlocklet extends Blocklet {
   public void setSegmentId(String segmentId) {
     this.segmentId = segmentId;
   }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
new file mode 100644
index 0000000..266120e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * FineGrainBlocklet
+ */
+public class FineGrainBlocklet extends Blocklet implements Serializable {
+
+  private List<Page> pages;
+
+  public FineGrainBlocklet(String blockId, String blockletId, List<Page> pages) {
+    super(blockId, blockletId);
+    this.pages = pages;
+  }
+
+  // For serialization purpose
+  public FineGrainBlocklet() {
+
+  }
+
+  public List<Page> getPages() {
+    return pages;
+  }
+
+  public static class Page implements Writable,Serializable {
+
+    private int pageId;
+
+    private int[] rowId;
+
+    public BitSet getBitSet() {
+      BitSet bitSet =
+          new BitSet(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      for (int row : rowId) {
+        bitSet.set(row);
+      }
+      return bitSet;
+    }
+
+    @Override public void write(DataOutput out) throws IOException {
+      out.writeInt(pageId);
+      out.writeInt(rowId.length);
+      for (int i = 0; i < rowId.length; i++) {
+        out.writeInt(rowId[i]);
+      }
+    }
+
+    @Override public void readFields(DataInput in) throws IOException {
+      pageId = in.readInt();
+      int length = in.readInt();
+      rowId = new int[length];
+      for (int i = 0; i < length; i++) {
+        rowId[i] = in.readInt();
+      }
+    }
+
+    public void setPageId(int pageId) {
+      this.pageId = pageId;
+    }
+
+    public void setRowId(int[] rowId) {
+      this.rowId = rowId;
+    }
+  }
+
+  public BitSetGroup getBitSetGroup(int numberOfPages) {
+    BitSetGroup bitSetGroup = new BitSetGroup(numberOfPages);
+    for (int i = 0; i < pages.size(); i++) {
+      bitSetGroup.setBitSet(pages.get(i).getBitSet(), pages.get(i).pageId);
+    }
+    return bitSetGroup;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    super.write(out);
+    int size = pages.size();
+    out.writeInt(size);
+    for (Page page : pages) {
+      page.write(out);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int size = in.readInt();
+    pages = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      Page page = new Page();
+      page.readFields(in);
+      pages.add(page);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index e4200c3..90eb297 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -72,7 +72,7 @@ import org.xerial.snappy.Snappy;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap implements DataMap, Cacheable {
+public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cacheable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
@@ -613,9 +613,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
     for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
       DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
-      boolean isScanRequired = FilterExpressionProcessor
-          .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
-              getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+      boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
+          filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+          getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
       if (isScanRequired) {
         return true;
       }
@@ -655,7 +655,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
         startIndex++;
       }
     }
-
     return blocklets;
   }
 
@@ -948,4 +947,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return memoryUsed;
   }
 
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 61e5ceb..f6b8165 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -27,9 +27,10 @@ import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -39,9 +40,6 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -53,7 +51,8 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher,
+public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
+    implements BlockletDetailsFetcher,
     SegmentPropertiesFetcher {
 
   private AbsoluteTableIdentifier identifier;
@@ -61,10 +60,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
-  // segmentId -> SegmentProperties.
-  private Map<String, SegmentProperties> segmentPropertiesMap = new HashMap<>();
-
-  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainDataMap> cache;
 
   @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -74,12 +70,12 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public DataMapWriter createWriter(String segmentId) {
+  public AbstractDataMapWriter createWriter(String segmentId, String dataWriterPath) {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public List<DataMap> getDataMaps(String segmentId) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         getTableBlockIndexUniqueIdentifiers(segmentId);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -140,17 +136,18 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
-    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
+    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
       if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }
     }
-    throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
+    throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
   }
 
 
+
   @Override
   public List<DataMapDistributable> toDistributable(String segmentId) {
     CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
@@ -179,7 +176,6 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   @Override
   public void clear(String segmentId) {
-    segmentPropertiesMap.remove(segmentId);
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
@@ -200,7 +196,8 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
     if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
@@ -217,7 +214,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
                 indexFile));
       }
     }
-    List<DataMap> dataMaps;
+    List<AbstractCoarseGrainDataMap> dataMaps;
     try {
       dataMaps = cache.getAll(identifiers);
     } catch (IOException e) {
@@ -233,23 +230,21 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
-    SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId);
-    if (segmentProperties == null) {
-      int[] columnCardinality;
-      List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-          getTableBlockIndexUniqueIdentifiers(segmentId);
-      DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-      List<DataFileFooter> indexInfo =
-          fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath());
-      for (DataFileFooter fileFooter : indexInfo) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        if (segmentProperties == null) {
-          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
-          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        }
-      }
-      segmentPropertiesMap.put(segmentId, segmentProperties);
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    assert (dataMaps.size() > 0);
+    AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
+    assert (coarseGrainDataMap instanceof BlockletDataMap);
+    BlockletDataMap dataMap = (BlockletDataMap) coarseGrainDataMap;
+    return dataMap.getSegmentProperties();
+  }
+
+  @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions)
+      throws IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    for (AbstractCoarseGrainDataMap dataMap : dataMaps) {
+      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions));
     }
-    return segmentProperties;
+    return blocklets;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
index 5cd59cb..b8fd6ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.constants.CarbonVersionConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -30,7 +31,9 @@ import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 
 /**
@@ -44,6 +47,8 @@ public class BlockletDataRefNode implements DataRefNode {
 
   private int[] dimensionLens;
 
+  private BlockletSerializer blockletSerializer;
+
   BlockletDataRefNode(List<TableBlockInfo> blockInfos, int index, int[] dimensionLens) {
     this.blockInfos = blockInfos;
     // Update row count and page count to blocklet info
@@ -76,6 +81,7 @@ public class BlockletDataRefNode implements DataRefNode {
     }
     this.index = index;
     this.dimensionLens = dimensionLens;
+    this.blockletSerializer = new BlockletSerializer();
   }
 
   @Override public DataRefNode getNextDataRefNode() {
@@ -209,11 +215,28 @@ public class BlockletDataRefNode implements DataRefNode {
     }
   }
 
-  @Override public int numberOfPages() {
+  @Override
+  public int numberOfPages() {
     return blockInfos.get(index).getDetailInfo().getPagesCount();
   }
 
-  @Override public int getPageRowCount(int pageNumber) {
+  @Override
+  public BitSetGroup getIndexedData() {
+    String dataMapWriterPath = blockInfos.get(index).getDataMapWriterPath();
+    if (dataMapWriterPath != null) {
+      try {
+        FineGrainBlocklet blocklet = blockletSerializer.deserializeBlocklet(dataMapWriterPath);
+        return blocklet.getBitSetGroup(numberOfPages());
+      } catch (IOException e) {
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getPageRowCount(int pageNumber) {
     return blockInfos.get(index).getDetailInfo().getBlockletInfo()
         .getNumberOfRowsPerPage()[pageNumber];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index a30f64c..95232e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -21,6 +21,8 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 
 /**
@@ -29,7 +31,10 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
  */
 public class IndexWrapper extends AbstractIndex {
 
+  private List<TableBlockInfo> blockInfos;
+
   public IndexWrapper(List<TableBlockInfo> blockInfos) {
+    this.blockInfos = blockInfos;
     segmentProperties = new SegmentProperties(blockInfos.get(0).getDetailInfo().getColumnSchemas(),
         blockInfos.get(0).getDetailInfo().getDimLens());
     dataRefNode = new BlockletDataRefNode(blockInfos, 0,
@@ -38,4 +43,17 @@ public class IndexWrapper extends AbstractIndex {
 
   @Override public void buildIndex(List<DataFileFooter> footerList) {
   }
+
+  @Override public void clear() {
+    super.clear();
+    if (blockInfos != null) {
+      for (TableBlockInfo blockInfo : blockInfos) {
+        String dataMapWriterPath = blockInfo.getDataMapWriterPath();
+        if (dataMapWriterPath != null) {
+          CarbonFile file = FileFactory.getCarbonFile(dataMapWriterPath);
+          FileFactory.deleteAllCarbonFilesOfDir(file);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
deleted file mode 100644
index 9d77010..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.schema;
-
-/**
- * Types of filters of select query
- */
-public enum FilterType {
-  EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 5974666..26b0f67 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -314,6 +314,9 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnPage dimensionColumnPage,
       BitSetGroup prvBitSetGroup, int pageNumber) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet();
     bitSet.or(prvPageBitSet);
     byte[][] filterKeys = dimColumnExecuterInfo.getExcludeFilterKeys();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 05328f3..516ed41 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -334,6 +334,9 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnPage dimensionColumnPage,
       BitSetGroup prvBitSetGroup, int pageNumber, int numberOfRows) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet(numberOfRows);
     byte[][] filterKeys = dimColumnExecuterInfo.getFilterKeys();
     int compareResult = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 1c73d63..033c3dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -163,6 +163,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
         totalBlockletStatistic.getCount() + 1);
+    // set the indexed data if it has any during fgdatamap pruning.
+    rawBlockletColumnChunks.setBitSetGroup(rawBlockletColumnChunks.getDataBlock().getIndexedData());
     // apply filter on actual data, for each page
     BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks,
         useBitSetPipeLine);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index bde817f..c9705c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -55,6 +56,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -103,6 +105,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -135,6 +138,13 @@ public final class CarbonUtil {
 
   private static final Configuration conf = new Configuration(true);
 
+  /**
+   * dfs.bytes-per-checksum
+   * HDFS checksum length, block size for a file should be exactly divisible
+   * by this value
+   */
+  private static final int HDFS_CHECKSUM_LENGTH = 512;
+
   private CarbonUtil() {
 
   }
@@ -2382,6 +2392,94 @@ public final class CarbonUtil {
     return Base64.decodeBase64(objectString.getBytes(CarbonCommonConstants.DEFAULT_CHARSET));
   }
 
+
+  /**
+   * This method will copy the given file to carbon store location
+   *
+   * @param localFilePath local file name with full path
+   * @throws CarbonDataWriterException
+   */
+  public static void copyCarbonDataFileToCarbonStorePath(String localFilePath,
+      String carbonDataDirectoryPath, long fileSizeInBytes)
+      throws CarbonDataWriterException {
+    long copyStartTime = System.currentTimeMillis();
+    LOGGER.info("Copying " + localFilePath + " --> " + carbonDataDirectoryPath);
+    try {
+      CarbonFile localCarbonFile =
+          FileFactory.getCarbonFile(localFilePath, FileFactory.getFileType(localFilePath));
+      String carbonFilePath = carbonDataDirectoryPath + localFilePath
+          .substring(localFilePath.lastIndexOf(File.separator));
+      copyLocalFileToCarbonStore(carbonFilePath, localFilePath,
+          CarbonCommonConstants.BYTEBUFFER_SIZE,
+          getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize()));
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while copying file from local store to carbon store", e);
+    }
+    LOGGER.info(
+        "Total copy time (ms) to copy file " + localFilePath + " is " + (System.currentTimeMillis()
+            - copyStartTime));
+  }
+
+  /**
+   * This method will read the local carbon data file and write to carbon data file in HDFS
+   *
+   * @param carbonStoreFilePath
+   * @param localFilePath
+   * @param bufferSize
+   * @param blockSize
+   * @throws IOException
+   */
+  private static void copyLocalFileToCarbonStore(String carbonStoreFilePath, String localFilePath,
+      int bufferSize, long blockSize) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + " is " + blockSize
+            + " (bytes");
+      }
+      dataOutputStream = FileFactory
+          .getDataOutputStream(carbonStoreFilePath, FileFactory.getFileType(carbonStoreFilePath),
+              bufferSize, blockSize);
+      dataInputStream = FileFactory
+          .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
+    } finally {
+      CarbonUtil.closeStream(dataInputStream);
+      CarbonUtil.closeStream(dataOutputStream);
+    }
+  }
+
+  /**
+   * This method will return max of block size and file size
+   *
+   * @param blockSize
+   * @param fileSize
+   * @return
+   */
+  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
+    long maxSize = blockSize;
+    if (fileSize > blockSize) {
+      maxSize = fileSize;
+    }
+    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
+    // per checksum, dfs.bytes-per-checksum=512 must divide block size
+    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
+    if (remainder > 0) {
+      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
+    }
+    // convert to make block size more readable.
+    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
+    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
+    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
+    LOGGER.info(
+        "The configured block size is " + readableBlockSize + ", the actual carbon file size is "
+            + readableFileSize + ", choose the max value " + readableMaxSize
+            + " as the block size on HDFS");
+    return maxSize;
+  }
+
   /**
    * This method will be used to update the min and max values and this will be used in case of
    * old store where min and max values for measures are written opposite

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
index 2ad6327..8002e57 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
@@ -28,7 +28,8 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -36,7 +37,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -47,7 +50,7 @@ import com.google.gson.Gson;
 /**
  * Datamap implementation for min max blocklet.
  */
-public class MinMaxDataMap implements DataMap {
+public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
@@ -58,8 +61,9 @@ public class MinMaxDataMap implements DataMap {
 
   private MinMaxIndexBlockDetails[] readMinMaxDataMap;
 
-  @Override public void init(String filePath) throws MemoryException, IOException {
-    this.filePath = filePath;
+  @Override
+  public void init(DataMapModel model) throws MemoryException, IOException {
+    this.filePath = model.getFilePath();
     CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
     for (int i = 0; i < listFiles.length; i++) {
       readMinMaxDataMap = readJson(listFiles[i].getPath());
@@ -76,7 +80,7 @@ public class MinMaxDataMap implements DataMap {
     });
   }
 
-  public MinMaxIndexBlockDetails[] readJson(String filePath) throws IOException {
+  private MinMaxIndexBlockDetails[] readJson(String filePath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
@@ -90,8 +94,7 @@ public class MinMaxDataMap implements DataMap {
         return null;
       }
       dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      inStream = new InputStreamReader(dataInputStream, "UTF-8");
       buffReader = new BufferedReader(inStream);
       readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class);
     } catch (IOException e) {
@@ -109,14 +112,14 @@ public class MinMaxDataMap implements DataMap {
    * @param segmentProperties
    * @return
    */
-  @Override public List<Blocklet> prune(FilterResolverIntf filterExp,
-      SegmentProperties segmentProperties) {
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp,
+      SegmentProperties segmentProperties, List<String> partitions) {
     List<Blocklet> blocklets = new ArrayList<>();
 
     if (filterExp == null) {
       for (int i = 0; i < readMinMaxDataMap.length; i++) {
-        blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(),
-            String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+        blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId())));
       }
     } else {
       FilterExecuter filterExecuter =
@@ -126,7 +129,7 @@ public class MinMaxDataMap implements DataMap {
         BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
             readMinMaxDataMap[startIndex].getMinValues());
         if (!bitSet.isEmpty()) {
-          blocklets.add(new Blocklet(readMinMaxDataMap[startIndex].getFilePath(),
+          blocklets.add(new Blocklet(filePath,
               String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
         }
         startIndex++;
@@ -136,6 +139,11 @@ public class MinMaxDataMap implements DataMap {
   }
 
   @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void clear() {
     readMinMaxDataMap = null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc43a46a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index b196d0d..5203cb3 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -25,49 +25,51 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.events.Event;
 
 /**
  * Min Max DataMap Factory
  */
-public class MinMaxDataMapFactory implements DataMapFactory {
+public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
-  @Override
-  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
     this.identifier = identifier;
   }
 
   /**
    * createWriter will return the MinMaxDataWriter.
+   *
    * @param segmentId
    * @return
    */
-  @Override
-  public DataMapWriter createWriter(String segmentId) {
-    return new MinMaxDataWriter();
+  @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) {
+    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
   }
 
   /**
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+   *
    * @param segmentId
    * @return
    * @throws IOException
    */
-  @Override public List<DataMap> getDataMaps(String segmentId) throws IOException {
-    List<DataMap> dataMapList = new ArrayList<>();
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId)
+      throws IOException {
+    List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxDataMap.
     MinMaxDataMap dataMap = new MinMaxDataMap();
     try {
-      dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator);
+      dataMap.init(new DataMapModel(
+          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator));
     } catch (MemoryException ex) {
 
     }
@@ -76,7 +78,6 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   }
 
   /**
-   *
    * @param segmentId
    * @return
    */
@@ -86,6 +87,7 @@ public class MinMaxDataMapFactory implements DataMapFactory {
 
   /**
    * Clear the DataMap.
+   *
    * @param segmentId
    */
   @Override public void clear(String segmentId) {
@@ -94,21 +96,20 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   /**
    * Clearing the data map.
    */
-  @Override
-  public void clear() {
+  @Override public void clear() {
   }
 
-  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     return null;
   }
 
-  @Override
-  public void fireEvent(ChangeEvent event) {
+  @Override public void fireEvent(Event event) {
 
   }
 
-  @Override
-  public DataMapMeta getMeta() {
-    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), FilterType.EQUALTO);
+  @Override public DataMapMeta getMeta() {
+    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
+        new ArrayList<ExpressionType>());
   }
 }
\ No newline at end of file