You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/05 16:48:20 UTC

[1/3] carbondata git commit: [CARBONDATA-1480]Min Max Index Example for DataMap

Repository: carbondata
Updated Branches:
  refs/heads/carbonstore e502c59a2 -> e972fd3d5


[CARBONDATA-1480]Min Max Index Example for DataMap

Datamap Example. Implementation of Min Max Index through Datamap. And Using the Index while prunning.

This closes #1359


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cae74a8c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cae74a8c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cae74a8c

Branch: refs/heads/carbonstore
Commit: cae74a8cecea74e8899a87dcb7d12e0dec1b8069
Parents: e502c59
Author: sounakr <so...@gmail.com>
Authored: Thu Sep 28 16:21:05 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Feb 5 12:34:25 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |  16 +-
 .../carbondata/core/datamap/TableDataMap.java   |  18 +-
 .../carbondata/core/datamap/dev/DataMap.java    |  14 +-
 .../core/datamap/dev/DataMapWriter.java         |   3 +-
 .../indexstore/SegmentPropertiesFetcher.java    |  36 +++
 .../blockletindex/BlockletDataMap.java          |   9 +-
 .../blockletindex/BlockletDataMapFactory.java   |  33 ++-
 datamap/examples/pom.xml                        | 111 ++++++++++
 .../datamap/examples/BlockletMinMax.java        |  41 ++++
 .../datamap/examples/MinMaxDataMap.java         | 143 ++++++++++++
 .../datamap/examples/MinMaxDataMapFactory.java  | 114 ++++++++++
 .../datamap/examples/MinMaxDataWriter.java      | 221 +++++++++++++++++++
 .../examples/MinMaxIndexBlockDetails.java       |  77 +++++++
 .../MinMaxDataMapExample.scala                  |  77 +++++++
 .../testsuite/datamap/DataMapWriterSuite.scala  |   2 +-
 pom.xml                                         |   2 +
 .../datamap/DataMapWriterListener.java          |   4 +-
 .../store/writer/AbstractFactDataWriter.java    |   7 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   3 +
 19 files changed, 900 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index d30483a..90e5fff 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
+import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -103,7 +104,7 @@ public final class DataMapStoreManager {
       tableDataMaps = new ArrayList<>();
     }
     TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap != null) {
+    if (dataMap != null && dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) {
       throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
     }
 
@@ -113,12 +114,15 @@ public final class DataMapStoreManager {
       DataMapFactory dataMapFactory = factoryClass.newInstance();
       dataMapFactory.init(identifier, dataMapName);
       BlockletDetailsFetcher blockletDetailsFetcher;
+      SegmentPropertiesFetcher segmentPropertiesFetcher = null;
       if (dataMapFactory instanceof BlockletDetailsFetcher) {
         blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory;
       } else {
         blockletDetailsFetcher = getBlockletDetailsFetcher(identifier);
       }
-      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher);
+      segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher,
+          segmentPropertiesFetcher);
     } catch (Exception e) {
       LOGGER.error(e);
       throw new RuntimeException(e);
@@ -128,11 +132,11 @@ public final class DataMapStoreManager {
     return dataMap;
   }
 
-  private TableDataMap getTableDataMap(String dataMapName,
-      List<TableDataMap> tableDataMaps) {
+  private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tableDataMaps) {
     TableDataMap dataMap = null;
-    for (TableDataMap tableDataMap: tableDataMaps) {
-      if (tableDataMap.getDataMapName().equals(dataMapName)) {
+    for (TableDataMap tableDataMap : tableDataMaps) {
+      if (tableDataMap.getDataMapName().equals(dataMapName) || (!tableDataMap.getDataMapName()
+          .equals(""))) {
         dataMap = tableDataMap;
         break;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 9c84891..1c80703 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -23,9 +23,11 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
@@ -46,15 +48,19 @@ public final class TableDataMap extends OperationEventListener {
 
   private BlockletDetailsFetcher blockletDetailsFetcher;
 
+  private SegmentPropertiesFetcher segmentPropertiesFetcher;
+
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
   public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
-      DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher) {
+      DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher,
+      SegmentPropertiesFetcher segmentPropertiesFetcher) {
     this.identifier = identifier;
     this.dataMapName = dataMapName;
     this.dataMapFactory = dataMapFactory;
     this.blockletDetailsFetcher = blockletDetailsFetcher;
+    this.segmentPropertiesFetcher = segmentPropertiesFetcher;
   }
 
   /**
@@ -67,11 +73,13 @@ public final class TableDataMap extends OperationEventListener {
   public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp,
       List<String> partitions) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
+    SegmentProperties segmentProperties;
     for (String segmentId : segmentIds) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
       for (DataMap dataMap : dataMaps) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp, partitions));
+        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
           .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
@@ -123,7 +131,11 @@ public final class TableDataMap extends OperationEventListener {
     List<Blocklet> blocklets = new ArrayList<>();
     List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
     for (DataMap dataMap : dataMaps) {
-      blocklets.addAll(dataMap.prune(filterExp, partitions));
+      blocklets.addAll(
+          dataMap.prune(
+              filterExp,
+              segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()),
+              partitions));
     }
     for (Blocklet blocklet: blocklets) {
       ExtendedBlocklet detailedBlocklet =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 16be1ac..dfe97e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap.dev;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -34,23 +35,14 @@ public interface DataMap {
   void init(DataMapModel dataMapModel) throws MemoryException, IOException;
 
   /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  List<Blocklet> prune(FilterResolverIntf filterExp);
-
-  // TODO Move this method to Abstract class
-  /**
    * Prune the datamap with filter expression and partition information. It returns the list of
    * blocklets where these filters can exist.
    *
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions);
+  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      List<String> partitions);
 
   // TODO Move this method to Abstract class
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 28163d7..413eaa5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -27,7 +27,7 @@ public interface DataMapWriter {
    *  Start of new block notification.
    *  @param blockId file name of the carbondata file
    */
-  void onBlockStart(String blockId);
+  void onBlockStart(String blockId, String blockPath);
 
   /**
    * End of block notification
@@ -45,7 +45,6 @@ public interface DataMapWriter {
    * @param blockletId sequence number of blocklet in the block
    */
   void onBlockletEnd(int blockletId);
-
   /**
    * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
    * DataMapMeta returned in DataMapFactory.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
new file mode 100644
index 0000000..ec2ae93
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+
+/**
+ * Fetches the detailed segmentProperties which has more information to execute the query
+ */
+public interface SegmentPropertiesFetcher {
+
+  /**
+   * get the Segment properties based on the SegmentID.
+   * @param segmentId
+   * @return
+   * @throws IOException
+   */
+  SegmentProperties getSegmentProperties(String segmentId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index b097c66..d331c2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -619,8 +619,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return false;
   }
 
-  @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp) {
+  private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
     if (unsafeMemoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
@@ -689,7 +688,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return blocklets;
   }
 
-  @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      List<String> partitions) {
     if (unsafeMemoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
@@ -711,7 +712,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       }
     }
     // Prune with filters if the partitions are existed in this datamap
-    return prune(filterExp);
+    return prune(filterExp, segmentProperties);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2e2cab5..61e5ceb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -30,13 +30,18 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -48,13 +53,17 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher {
+public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher,
+    SegmentPropertiesFetcher {
 
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
+  // segmentId -> SegmentProperties.
+  private Map<String, SegmentProperties> segmentPropertiesMap = new HashMap<>();
+
   private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
 
   @Override
@@ -170,6 +179,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   @Override
   public void clear(String segmentId) {
+    segmentPropertiesMap.remove(segmentId);
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
@@ -221,4 +231,25 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
     // TODO: pass SORT_COLUMNS into this class
     return null;
   }
+
+  @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
+    SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId);
+    if (segmentProperties == null) {
+      int[] columnCardinality;
+      List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segmentId);
+      DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+      List<DataFileFooter> indexInfo =
+          fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath());
+      for (DataFileFooter fileFooter : indexInfo) {
+        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+        if (segmentProperties == null) {
+          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+        }
+      }
+      segmentPropertiesMap.put(segmentId, segmentProperties);
+    }
+    return segmentProperties;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml
new file mode 100644
index 0000000..6832e62
--- /dev/null
+++ b/datamap/examples/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.carbondata</groupId>
+        <artifactId>carbondata-parent</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>carbondata-datamap-examples</artifactId>
+    <name>Apache CarbonData :: Datamap Examples</name>
+
+    <properties>
+        <dev.path>${basedir}/../../dev</dev.path>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-spark2</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-hive-thriftserver_2.10</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-repl_2.10</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-sql_2.10</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-repl_${scala.binary.version}</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
+        <resources>
+            <resource>
+                <directory>.</directory>
+                <includes>
+                    <include>CARBON_EXAMPLESLogResource.properties</include>
+                </includes>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>2.15.2</version>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                    <execution>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java
new file mode 100644
index 0000000..e6968fe
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+
+public class BlockletMinMax {
+  private byte[][] Min;
+
+  private byte[][] Max;
+
+  public byte[][] getMin() {
+    return Min;
+  }
+
+  public void setMin(byte[][] min) {
+    Min = min;
+  }
+
+  public byte[][] getMax() {
+    return Max;
+  }
+
+  public void setMax(byte[][] max) {
+    Max = max;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
new file mode 100644
index 0000000..2ad6327
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.gson.Gson;
+
+/**
+ * Datamap implementation for min max blocklet.
+ */
+public class MinMaxDataMap implements DataMap {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
+
+  public static final String NAME = "clustered.minmax.btree.blocklet";
+
+  private String filePath;
+
+  private MinMaxIndexBlockDetails[] readMinMaxDataMap;
+
+  @Override public void init(String filePath) throws MemoryException, IOException {
+    this.filePath = filePath;
+    CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
+    for (int i = 0; i < listFiles.length; i++) {
+      readMinMaxDataMap = readJson(listFiles[i].getPath());
+    }
+  }
+
+  private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) {
+    String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(path);
+    return carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(".minmaxindex");
+      }
+    });
+  }
+
+  public MinMaxIndexBlockDetails[] readJson(String filePath) throws IOException {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    MinMaxIndexBlockDetails[] readMinMax = null;
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath));
+
+    try {
+      if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) {
+        return null;
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      buffReader = new BufferedReader(inStream);
+      readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class);
+    } catch (IOException e) {
+      return null;
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+    return readMinMax;
+  }
+
+  /**
+   * Block Prunning logic for Min Max DataMap.
+   *
+   * @param filterExp
+   * @param segmentProperties
+   * @return
+   */
+  @Override public List<Blocklet> prune(FilterResolverIntf filterExp,
+      SegmentProperties segmentProperties) {
+    List<Blocklet> blocklets = new ArrayList<>();
+
+    if (filterExp == null) {
+      for (int i = 0; i < readMinMaxDataMap.length; i++) {
+        blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(),
+            String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+      }
+    } else {
+      FilterExecuter filterExecuter =
+          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+      int startIndex = 0;
+      while (startIndex < readMinMaxDataMap.length) {
+        BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
+            readMinMaxDataMap[startIndex].getMinValues());
+        if (!bitSet.isEmpty()) {
+          blocklets.add(new Blocklet(readMinMaxDataMap[startIndex].getFilePath(),
+              String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
+        }
+        startIndex++;
+      }
+    }
+    return blocklets;
+  }
+
+  @Override
+  public void clear() {
+    readMinMaxDataMap = null;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
new file mode 100644
index 0000000..b196d0d
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+
+/**
+ * Min Max DataMap Factory
+ */
+public class MinMaxDataMapFactory implements DataMapFactory {
+
+  private AbsoluteTableIdentifier identifier;
+
+  @Override
+  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+    this.identifier = identifier;
+  }
+
+  /**
+   * createWriter will return the MinMaxDataWriter.
+   * @param segmentId
+   * @return
+   */
+  @Override
+  public DataMapWriter createWriter(String segmentId) {
+    return new MinMaxDataWriter();
+  }
+
+  /**
+   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+   * @param segmentId
+   * @return
+   * @throws IOException
+   */
+  @Override public List<DataMap> getDataMaps(String segmentId) throws IOException {
+    List<DataMap> dataMapList = new ArrayList<>();
+    // Form a dataMap of Type MinMaxDataMap.
+    MinMaxDataMap dataMap = new MinMaxDataMap();
+    try {
+      dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator);
+    } catch (MemoryException ex) {
+
+    }
+    dataMapList.add(dataMap);
+    return dataMapList;
+  }
+
+  /**
+   *
+   * @param segmentId
+   * @return
+   */
+  @Override public List<DataMapDistributable> toDistributable(String segmentId) {
+    return null;
+  }
+
+  /**
+   * Clear the DataMap.
+   * @param segmentId
+   */
+  @Override public void clear(String segmentId) {
+  }
+
+  /**
+   * Clearing the data map.
+   */
+  @Override
+  public void clear() {
+  }
+
+  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+    return null;
+  }
+
+  @Override
+  public void fireEvent(ChangeEvent event) {
+
+  }
+
+  @Override
+  public DataMapMeta getMeta() {
+    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), FilterType.EQUALTO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
new file mode 100644
index 0000000..78544d3
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.gson.Gson;
+
+public class MinMaxDataWriter implements DataMapWriter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TableInfo.class.getName());
+
+  private byte[][] pageLevelMin, pageLevelMax;
+
+  private byte[][] blockletLevelMin, blockletLevelMax;
+
+  private Map<Integer, BlockletMinMax> blockMinMaxMap;
+
+  private String blockPath;
+
+
+  @Override public void onBlockStart(String blockId, String blockPath) {
+    pageLevelMax = null;
+    pageLevelMin = null;
+    blockletLevelMax = null;
+    blockletLevelMin = null;
+    blockMinMaxMap = null;
+    blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
+    this.blockPath = blockPath;
+  }
+
+  @Override public void onBlockEnd(String blockId) {
+    updateMinMaxIndex(blockId);
+  }
+
+  @Override public void onBlockletStart(int blockletId) {
+  }
+
+  @Override public void onBlockletEnd(int blockletId) {
+    updateBlockletMinMax(blockletId);
+  }
+
+  @Override
+  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) {
+    // Calculate Min and Max value within this page.
+
+    // As part of example we are extracting Min Max values Manually. The same can be done from
+    // retrieving the page statistics. For e.g.
+
+    // if (pageLevelMin == null && pageLevelMax == null) {
+    //    pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
+    //        pages[0].getStatistics().getMin());
+    //    pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
+    //        pages[0].getStatistics().getMax());
+    //  } else {
+    //    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], CarbonUtil
+    //        .getValueAsBytes(pages[0].getStatistics().getDataType(),
+    //            pages[0].getStatistics().getMin())) > 0) {
+    //      pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
+    //          pages[0].getStatistics().getMin());
+    //    }
+    //    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], CarbonUtil
+    //        .getValueAsBytes(pages[0].getStatistics().getDataType(),
+    //            pages[0].getStatistics().getMax())) < 0) {
+    //      pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
+    //          pages[0].getStatistics().getMax());
+    //    }
+
+    byte[] value = new byte[pages[0].getBytes(0).length - 2];
+    if (pageLevelMin == null && pageLevelMax == null) {
+      pageLevelMin = new byte[2][];
+      pageLevelMax = new byte[2][];
+
+      System.arraycopy(pages[0].getBytes(0), 2, value, 0, value.length);
+      pageLevelMin[1] = value;
+      pageLevelMax[1] = value;
+
+    } else {
+      for (int rowIndex = 0; rowIndex < pages[0].getPageSize(); rowIndex++) {
+        System.arraycopy(pages[0].getBytes(rowIndex), 2, value, 0, value.length);
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], value) > 0) {
+          pageLevelMin[1] = value;
+        }
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], value) < 0) {
+          pageLevelMax[1] = value;
+        }
+      }
+    }
+  }
+
+  private void updateBlockletMinMax(int blockletId) {
+    if (blockletLevelMax == null || blockletLevelMin == null) {
+      blockletLevelMax = new byte[2][];
+      blockletLevelMin = new byte[2][];
+      if (pageLevelMax != null || pageLevelMin != null) {
+        blockletLevelMin = pageLevelMin;
+        blockletLevelMax = pageLevelMax;
+      }
+    } else {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMin[1], pageLevelMin[1]) > 0) {
+        blockletLevelMin = pageLevelMin;
+      }
+
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMax[1], pageLevelMax[1]) > 0) {
+        blockletLevelMax = pageLevelMax;
+      }
+    }
+    BlockletMinMax blockletMinMax = new BlockletMinMax();
+    blockletMinMax.setMax(blockletLevelMax);
+    blockletMinMax.setMin(blockletLevelMin);
+    blockMinMaxMap.put(blockletId, blockletMinMax);
+  }
+
+
+  public void updateMinMaxIndex(String blockId) {
+    constructMinMaxIndex(blockId);
+  }
+
+
+
+  /**
+   * Construct the Min Max Index.
+   * @param blockId
+   */
+  public void constructMinMaxIndex(String blockId) {
+    // construct Min and Max values of each Blocklets present inside a block.
+    List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null;
+    tempMinMaxIndexBlockDetails = loadBlockDetails();
+    try {
+      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId);
+    } catch (IOException ex) {
+      LOGGER.info(" Unable to write the file");
+    }
+  }
+
+  /**
+   * loadBlockDetails into the MinMaxIndexBlockDetails class.
+   */
+  private List<MinMaxIndexBlockDetails> loadBlockDetails() {
+    List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails = new ArrayList<MinMaxIndexBlockDetails>();
+    MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails();
+
+    for (int index = 0; index < blockMinMaxMap.size(); index++) {
+      tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
+      tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
+      tmpminMaxIndexBlockDetails.setBlockletId(index);
+      tmpminMaxIndexBlockDetails.setFilePath(this.blockPath);
+      minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails);
+    }
+    return minMaxIndexBlockDetails;
+  }
+
+  /**
+   * Write the data to a file. This is JSON format file.
+   * @param minMaxIndexBlockDetails
+   * @param blockPath
+   * @param blockId
+   * @throws IOException
+   */
+  public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails,
+      String blockPath, String blockId) throws IOException {
+    String filePath = blockPath.substring(0, blockPath.lastIndexOf(File.separator) + 1) + blockId
+        + ".minmaxindex";
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutStream = null;
+    try {
+      FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath));
+      dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath));
+      Gson gsonObjectToWrite = new Gson();
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+      String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails);
+      brWriter.write(minmaxIndexData);
+    } catch (IOException ioe) {
+      LOGGER.info("Error in writing minMaxindex file");
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      if (null != dataOutStream) {
+        dataOutStream.flush();
+      }
+      CarbonUtil.closeStreams(brWriter, dataOutStream);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
new file mode 100644
index 0000000..0596db5
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.Serializable;
+
+public class MinMaxIndexBlockDetails implements Serializable {
+  private static final long serialVersionUID = 1206104914911491724L;
+
+  /**
+   * Min value of a column of one blocklet Bit-Packed
+   */
+  private byte[][] minValues;
+
+  /**
+   * Max value of a columns of one blocklet Bit-Packed
+   */
+  private byte[][] maxValues;
+
+  /**
+   * filePath pointing to the block.
+   */
+  private String filePath;
+
+  /**
+   * BlockletID of the block.
+   */
+  private Integer BlockletId;
+
+
+  public byte[][] getMinValues() {
+    return minValues;
+  }
+
+  public void setMinValues(byte[][] minValues) {
+    this.minValues = minValues;
+  }
+
+  public byte[][] getMaxValues() {
+    return maxValues;
+  }
+
+  public void setMaxValues(byte[][] maxValues) {
+    this.maxValues = maxValues;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public Integer getBlockletId() {
+    return BlockletId;
+  }
+
+  public void setBlockletId(Integer blockletId) {
+    BlockletId = blockletId;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
new file mode 100644
index 0000000..0cfe410
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+
+object MinMaxDataMapExample {
+  def main(args: Array[String]): Unit = {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "").getCanonicalPath
+    val storeLocation = s"$rootPath/dataMap/examples/target/store"
+    val warehouse = s"$rootPath/datamap/examples/target/warehouse"
+    val metastoredb = s"$rootPath/datamap/examples/target"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonDataMapExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation)
+
+    spark.sparkContext.setLogLevel("ERROR")
+    import spark.implicits._
+
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"),
+      classOf[MinMaxDataMapFactory].getName,
+      MinMaxDataMap.NAME)
+
+    spark.sql("DROP TABLE IF EXISTS carbonminmax")
+
+    val df = spark.sparkContext.parallelize(1 to 33000)
+      .map(x => ("a", "b", x))
+      .toDF("c1", "c2", "c3")
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbonminmax")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Query the table.
+    spark.sql("select c2 from carbonminmax").show(20, false)
+    spark.sql("select c2 from carbonminmax where c2 = 'b'").show(20, false)
+    spark.sql("DROP TABLE IF EXISTS carbonminmax")
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 74216ac..553e080 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -191,7 +191,7 @@ object DataMapWriterSuite {
       callbackSeq :+= s"blocklet start $blockletId"
     }
 
-    override def onBlockStart(blockId: String): Unit = {
+    override def onBlockStart(blockId: String, blockPath: String): Unit = {
       callbackSeq :+= s"block start $blockId"
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2399e26..66dc4ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -437,6 +437,7 @@
         <module>format</module>
         <module>integration/spark2</module>
         <module>examples/spark2</module>
+        <module>datamap/examples</module>
         <module>integration/hive</module>
         <module>integration/presto</module>
         <module>examples/flink</module>
@@ -473,6 +474,7 @@
         <module>integration/presto</module>
         <module>streaming</module>
         <module>examples/spark2</module>
+        <module>datamap/examples</module>
       </modules>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 4b0113c..8e350d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -82,10 +82,10 @@ public class DataMapWriterListener {
     LOG.info("DataMapWriter " + writer + " added");
   }
 
-  public void onBlockStart(String blockId) {
+  public void onBlockStart(String blockId, String blockPath) {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId);
+        writer.onBlockStart(blockId, blockPath);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index d1fc17b..7d0a285 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -269,7 +269,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
   private void notifyDataMapBlockStart() {
     if (listener != null) {
-      listener.onBlockStart(carbonDataFileName);
+      listener.onBlockStart(carbonDataFileName, constructFactFileFullPath());
     }
   }
 
@@ -280,6 +280,11 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     blockletId = 0;
   }
 
+  private String constructFactFileFullPath() {
+    String factFilePath =
+        this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName;
+    return factFilePath;
+  }
   /**
    * Finish writing current file. It will flush stream, copy and rename temp file to final file
    * @param copyInCurrentThread set to false if want to do data copy in a new thread

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cae74a8c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index ddf444d..80d8154 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 
+
 /**
  * Below class will be used to write the data in V3 format
  * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
@@ -157,6 +158,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
     }
   }
 
+
+
   /**
    * Write the collect blocklet data (blockletDataHolder) to file
    */


[2/3] carbondata git commit: [CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index 78544d3..fe0bbcf 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.datamap.examples;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
@@ -29,17 +28,18 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
 
-public class MinMaxDataWriter implements DataMapWriter {
+public class MinMaxDataWriter extends AbstractDataMapWriter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(TableInfo.class.getName());
@@ -50,17 +50,23 @@ public class MinMaxDataWriter implements DataMapWriter {
 
   private Map<Integer, BlockletMinMax> blockMinMaxMap;
 
-  private String blockPath;
+  private String dataWritePath;
 
+  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String dataWritePath) {
+    super(identifier, segmentId, dataWritePath);
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.dataWritePath = dataWritePath;
+  }
 
-  @Override public void onBlockStart(String blockId, String blockPath) {
+  @Override public void onBlockStart(String blockId) {
     pageLevelMax = null;
     pageLevelMin = null;
     blockletLevelMax = null;
     blockletLevelMin = null;
     blockMinMaxMap = null;
     blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
-    this.blockPath = blockPath;
   }
 
   @Override public void onBlockEnd(String blockId) {
@@ -161,7 +167,7 @@ public class MinMaxDataWriter implements DataMapWriter {
     List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null;
     tempMinMaxIndexBlockDetails = loadBlockDetails();
     try {
-      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId);
+      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockId);
     } catch (IOException ex) {
       LOGGER.info(" Unable to write the file");
     }
@@ -178,7 +184,6 @@ public class MinMaxDataWriter implements DataMapWriter {
       tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
       tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
       tmpminMaxIndexBlockDetails.setBlockletId(index);
-      tmpminMaxIndexBlockDetails.setFilePath(this.blockPath);
       minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails);
     }
     return minMaxIndexBlockDetails;
@@ -187,22 +192,19 @@ public class MinMaxDataWriter implements DataMapWriter {
   /**
    * Write the data to a file. This is JSON format file.
    * @param minMaxIndexBlockDetails
-   * @param blockPath
    * @param blockId
    * @throws IOException
    */
   public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails,
-      String blockPath, String blockId) throws IOException {
-    String filePath = blockPath.substring(0, blockPath.lastIndexOf(File.separator) + 1) + blockId
-        + ".minmaxindex";
+      String blockId) throws IOException {
+    String filePath = dataWritePath +"/" + blockId + ".minmaxindex";
     BufferedWriter brWriter = null;
     DataOutputStream dataOutStream = null;
     try {
       FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath));
       dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath));
       Gson gsonObjectToWrite = new Gson();
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, "UTF-8"));
       String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails);
       brWriter.write(minmaxIndexData);
     } catch (IOException ioe) {
@@ -215,7 +217,11 @@ public class MinMaxDataWriter implements DataMapWriter {
         dataOutStream.flush();
       }
       CarbonUtil.closeStreams(brWriter, dataOutStream);
+      commitFile(filePath);
     }
   }
 
+  @Override public void finish() throws IOException {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
index 0596db5..93a453e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
@@ -33,11 +33,6 @@ public class MinMaxIndexBlockDetails implements Serializable {
   private byte[][] maxValues;
 
   /**
-   * filePath pointing to the block.
-   */
-  private String filePath;
-
-  /**
    * BlockletID of the block.
    */
   private Integer BlockletId;
@@ -59,14 +54,6 @@ public class MinMaxIndexBlockDetails implements Serializable {
     this.maxValues = maxValues;
   }
 
-  public String getFilePath() {
-    return filePath;
-  }
-
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
-
   public Integer getBlockletId() {
     return BlockletId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index c1ef14d..7d91806 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -86,6 +86,8 @@ public class CarbonInputSplit extends FileSplit
 
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
+  private String dataMapWritePath;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -97,7 +99,8 @@ public class CarbonInputSplit extends FileSplit
   }
 
   private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
-      String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles) {
+      String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles,
+      String dataMapWritePath) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -110,12 +113,13 @@ public class CarbonInputSplit extends FileSplit
     this.invalidSegments = new ArrayList<>();
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
+    this.dataMapWritePath = dataMapWritePath;
   }
 
   public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
       String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
       String[] deleteDeltaFiles) {
-    this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles);
+    this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles, null);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
@@ -165,9 +169,9 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public static CarbonInputSplit from(String segmentId, String blockletId, FileSplit split,
-      ColumnarFormatVersion version) throws IOException {
+      ColumnarFormatVersion version, String dataMapWritePath) throws IOException {
     return new CarbonInputSplit(segmentId, blockletId, split.getPath(), split.getStart(),
-        split.getLength(), split.getLocations(), version, null);
+        split.getLength(), split.getLocations(), version, null, dataMapWritePath);
   }
 
   public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
@@ -181,6 +185,7 @@ public class CarbonInputSplit extends FileSplit
                 split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos,
                 split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
+        blockInfo.setDataMapWriterPath(split.dataMapWritePath);
         tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);
@@ -230,6 +235,10 @@ public class CarbonInputSplit extends FileSplit
       detailInfo = new BlockletDetailInfo();
       detailInfo.readFields(in);
     }
+    boolean dataMapWriterPathExists = in.readBoolean();
+    if (dataMapWriterPathExists) {
+      dataMapWritePath = in.readUTF();
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -252,6 +261,10 @@ public class CarbonInputSplit extends FileSplit
     if (detailInfo != null) {
       detailInfo.write(out);
     }
+    out.writeBoolean(dataMapWritePath != null);
+    if (dataMapWritePath != null) {
+      out.writeUTF(dataMapWritePath);
+    }
   }
 
   public List<String> getInvalidSegments() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 24f5713..ad5a1c9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -716,16 +717,17 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     // get tokens for all the required FileSystem for table path
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
         new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
-
-    TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
-            BlockletDataMapFactory.class.getName());
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    TableDataMap blockletMap =
+        DataMapStoreManager.getInstance().chooseDataMap(absoluteTableIdentifier);
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (dataMapJob != null) {
+    if (distributedCG || blockletMap.getDataMapFactory().getDataMapType() == DataMapType.FG) {
       DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, BlockletDataMap.NAME,
+          new DistributableDataMapFormat(absoluteTableIdentifier, blockletMap.getDataMapName(),
               segmentIds, partitionsToPrune,
               BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
@@ -778,7 +780,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
             blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
                 blocklet.getLength(), blocklet.getLocations()),
-            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+            blocklet.getDataMapWriterPath());
     split.setDetailInfo(blocklet.getDetailInfo());
     return split;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
new file mode 100644
index 0000000..4b6f231
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapName: String = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+    this.dataMapName = dataMapName
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
+    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map {f =>
+      val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+
+  /**
+   * Get datamaps for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event): Unit = {
+    ???
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+  }
+}
+
+class CGDataMap extends AbstractCoarseGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size-4)
+    val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[Blocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f=>
+      new Blocklet(f._1, f._2+"")
+    }.asJava
+  }
+
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+    }
+    tuples
+  }
+
+  private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.getChildren != null) {
+      expression.getChildren.asScala.map { f =>
+        if (f.isInstanceOf[EqualToExpression]) {
+          buffer += f
+        }
+        getEqualToExpression(f, buffer)
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear() = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String,
+    dataWritePath: String,
+    dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+
+  var currentBlockId: String = null
+  val cgwritepath = dataWritePath + "/" +
+                    dataMapName + System.nanoTime() + ".datamap"
+  lazy val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+  val blockletList = new ArrayBuffer[Array[Byte]]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+    maxMin +=
+    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[Array[Byte]]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += newBytes
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+    blockletList += sorted.head
+    blockletList += sorted.last
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(cgwritepath)
+  }
+
+
+}
+
+class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test cg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+    sql(
+      """
+        | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      table.getAbsoluteTableIdentifier,
+      classOf[CGDataMapFactory].getName, "cgdatamap")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 553e080..f694a6b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,25 +20,30 @@ package org.apache.carbondata.spark.testsuite.datamap
 import java.util
 
 import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
 
-class C2DataMapFactory() extends DataMapFactory {
+class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
+
+  var identifier: AbsoluteTableIdentifier = _
 
   override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {}
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+  }
 
   override def fireEvent(event: Event): Unit = ???
 
@@ -46,13 +51,14 @@ class C2DataMapFactory() extends DataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter =
+    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath)
 
-  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
 
   /**
    * Get all distributable objects of a segmentid
@@ -62,6 +68,7 @@ class C2DataMapFactory() extends DataMapFactory {
   override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
     ???
   }
+
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
@@ -164,9 +171,12 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
 }
 
 object DataMapWriterSuite {
+
   var callbackSeq: Seq[String] = Seq[String]()
 
-  val dataMapWriterC2Mock = new DataMapWriter {
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+      dataWritePath: String) =
+    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
 
     override def onPageAdded(
         blockletId: Int,
@@ -191,9 +201,21 @@ object DataMapWriterSuite {
       callbackSeq :+= s"blocklet start $blockletId"
     }
 
-    override def onBlockStart(blockId: String, blockPath: String): Unit = {
+    /**
+     * Start of new block notification.
+     *
+     * @param blockId file name of the carbondata file
+     */
+    override def onBlockStart(blockId: String) = {
       callbackSeq :+= s"block start $blockId"
     }
 
+    /**
+     * This is called during closing of writer.So after this call no more data will be sent to this
+     * class.
+     */
+    override def finish() = {
+
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
new file mode 100644
index 0000000..d1bb65f
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapName: String = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+    this.dataMapName = dataMapName
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
+    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val dataMap: AbstractFineGrainDataMap = new FGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+  /**
+   * Get datamap for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[AbstractFineGrainDataMap]= {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractFineGrainDataMap = new FGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event):Unit = {
+    ???
+  }
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+  }
+}
+
+class FGDataMap extends AbstractFineGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
+  var FileReader: FileReader = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = FileReader.readInt(filePath, size - 4)
+    val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f =>
+      readAndFindData(f, value(0).getBytes())
+    }.filter(_.isDefined).map(_.get).asJava
+  }
+
+  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
+      value: Array[Byte]): Option[FineGrainBlocklet] = {
+    val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
+    val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(outputStream)
+    val blockletsData = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+    import scala.collection.Searching._
+    val searching = blockletsData
+      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
+      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+      }
+    })
+    if (searching.insertionPoint >= 0) {
+      val f = blockletsData(searching.insertionPoint)
+      val pages = f._3.zipWithIndex.map { p =>
+        val pg = new FineGrainBlocklet.Page
+        pg.setPageId(p._1)
+        pg.setRowId(f._2(p._2).toArray)
+        pg
+      }
+      pages
+      Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
+    } else {
+      None
+    }
+
+  }
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+    }
+    tuples
+  }
+
+  def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.getChildren != null) {
+      expression.getChildren.asScala.map { f =>
+        if (f.isInstanceOf[EqualToExpression]) {
+          buffer += f
+        }
+        getEqualToExpression(f, buffer)
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear():Unit = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String, dataWriterPath: String, dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+
+  var currentBlockId: String = null
+  val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+  val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
+  var position: Long = 0
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+    var addedLast: Boolean = false
+    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
+          addedLast = false
+        } else {
+          blockletListUpdated += oldValue
+          oldValue = (f._1, Seq(f._2), f._3)
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), f._3)
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletListUpdated += oldValue
+    }
+
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(blockletListUpdated)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    maxMin +=
+    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
+      ._1), position, bytes.length))
+    position += bytes.length
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[(Array[Byte], Int)]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += ((newBytes, i))
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+    var addedLast: Boolean = false
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+          addedLast = false
+        } else {
+          blockletList += oldValue
+          oldValue = (f._1, Seq(f._2), Seq(pageId))
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), Seq(pageId))
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletList += oldValue
+    }
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(fgwritepath)
+  }
+
+
+}
+
+class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test fg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      table.getAbsoluteTableIdentifier,
+      classOf[FGDataMapFactory].getName, "fgdatamap")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
index 25bdf7b..c200b1b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -28,14 +28,14 @@ import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
 
 class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
@@ -164,7 +164,7 @@ object Global {
   var overwriteRunning = false
 }
 
-class WaitingDataMap() extends DataMapFactory {
+class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
   override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
 
@@ -174,12 +174,12 @@ class WaitingDataMap() extends DataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = {
-    new DataMapWriter {
+  override def createWriter(segmentId: String, writerPath: String): AbstractDataMapWriter = {
+    new AbstractDataMapWriter(null, segmentId, writerPath) {
       override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
 
       override def onBlockletEnd(blockletId: Int): Unit = { }
@@ -195,10 +195,14 @@ class WaitingDataMap() extends DataMapFactory {
         // wait for 1 second to let second SQL to finish
         Thread.sleep(1000)
       }
+
+      override def finish(): Unit = {
+
+      }
     }
   }
 
-  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
+  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, Seq(ExpressionType.EQUALS).asJava)
 
   override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 084a748..1b68458 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -438,9 +438,10 @@ class CarbonScanRDD(
     CarbonTableInputFormat.setQuerySegment(conf, identifier)
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
-    if (CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+    CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    if (CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
       CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index f5a90de..2552ca8 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -778,8 +778,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .start()
           qry.awaitTermination()
         } catch {
-          case ex =>
-            throw new Exception(ex.getMessage)
+          case ex: Throwable =>
+            LOGGER.error(ex.getMessage)
+            throw new Exception(ex.getMessage, ex)
         } finally {
           if (null != qry) {
             qry.stop()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 8e350d9..31a6701 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.datamap;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,8 +29,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.processing.store.TablePage;
@@ -43,25 +44,26 @@ public class DataMapWriterListener {
       DataMapWriterListener.class.getCanonicalName());
 
   // list indexed column name -> list of data map writer
-  private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+  private Map<List<String>, List<AbstractDataMapWriter>> registry = new ConcurrentHashMap<>();
 
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String dataWritePath) {
     List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();
-        register(factory, segmentId);
+        register(factory, segmentId, dataWritePath);
       }
     }
   }
 
   /**
-   * Register a DataMapWriter
+   * Register a AbstractDataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId) {
+  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();
@@ -70,8 +72,8 @@ public class DataMapWriterListener {
       return;
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
-    List<DataMapWriter> writers = registry.get(columns);
-    DataMapWriter writer = factory.createWriter(segmentId);
+    List<AbstractDataMapWriter> writers = registry.get(columns);
+    AbstractDataMapWriter writer = factory.createWriter(segmentId, dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {
@@ -79,36 +81,36 @@ public class DataMapWriterListener {
       writers.add(writer);
       registry.put(columns, writers);
     }
-    LOG.info("DataMapWriter " + writer + " added");
+    LOG.info("AbstractDataMapWriter " + writer + " added");
   }
 
   public void onBlockStart(String blockId, String blockPath) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId, blockPath);
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
+        writer.onBlockStart(blockId);
       }
     }
   }
 
   public void onBlockEnd(String blockId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockEnd(blockId);
       }
     }
   }
 
   public void onBlockletStart(int blockletId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockletStart(blockletId);
       }
     }
   }
 
   public void onBlockletEnd(int blockletId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockletEnd(blockletId);
       }
     }
@@ -121,18 +123,29 @@ public class DataMapWriterListener {
    * @param tablePage  page data
    */
   public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
-    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
-    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+    Set<Map.Entry<List<String>, List<AbstractDataMapWriter>>> entries = registry.entrySet();
+    for (Map.Entry<List<String>, List<AbstractDataMapWriter>> entry : entries) {
       List<String> indexedColumns = entry.getKey();
       ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
       for (int i = 0; i < indexedColumns.size(); i++) {
         pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
       }
-      List<DataMapWriter> writers = entry.getValue();
-      for (DataMapWriter writer : writers) {
+      List<AbstractDataMapWriter> writers = entry.getValue();
+      for (AbstractDataMapWriter writer : writers) {
         writer.onPageAdded(blockletId, pageId, pages);
       }
     }
   }
 
+  /**
+   * Finish all datamap writers
+   */
+  public void finish() throws IOException {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
+        writer.finish();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index bc87823..0e9cbc5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -259,7 +260,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId());
+    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
+        storeLocation[new Random().nextInt(storeLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
 
@@ -323,6 +325,12 @@ public class CarbonFactDataHandlerModel {
         segmentProperties.getDimensions(),
         segmentProperties.getMeasures());
 
+    DataMapWriterListener listener = new DataMapWriterListener();
+    listener.registerAllWriter(
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        loadModel.getSegmentId(),
+        tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
+    carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     return carbonFactDataHandlerModel;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 7d0a285..f229c7a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.store.writer;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
@@ -41,14 +39,11 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -63,7 +58,6 @@ import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.io.IOUtils;
 
 public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
@@ -71,12 +65,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
 
   /**
-   * dfs.bytes-per-checksum
-   * HDFS checksum length, block size for a file should be exactly divisible
-   * by this value
-   */
-  private static final int HDFS_CHECKSUM_LENGTH = 512;
-  /**
    * file channel
    */
   protected FileChannel fileChannel;
@@ -208,35 +196,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   }
 
   /**
-   * This method will return max of block size and file size
-   *
-   * @param blockSize
-   * @param fileSize
-   * @return
-   */
-  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
-    long maxSize = blockSize;
-    if (fileSize > blockSize) {
-      maxSize = fileSize;
-    }
-    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
-    // per checksum, dfs.bytes-per-checksum=512 must divide block size
-    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
-    if (remainder > 0) {
-      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
-    }
-    // convert to make block size more readable.
-    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
-    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
-    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
-    LOGGER.info(
-        "The configured block size is " + readableBlockSize + ", the actual carbon file size is "
-            + readableFileSize + ", choose the max value " + readableMaxSize
-            + " as the block size on HDFS");
-    return maxSize;
-  }
-
-  /**
    * This method will be used to update the file channel with new file if exceeding block size
    * threshold, new file will be created once existing file reached the file size limit This
    * method will first check whether existing file size is exceeded the file
@@ -282,7 +241,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
   private String constructFactFileFullPath() {
     String factFilePath =
-        this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName;
+        this.model.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName;
     return factFilePath;
   }
   /**
@@ -293,7 +252,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     if (copyInCurrentThread) {
-      copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath);
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
+          carbonDataFileTempPath, model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
     } else {
       executorServiceSubmitList.add(executorService.submit(new CopyThread(carbonDataFileTempPath)));
     }
@@ -445,7 +406,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     }
     writer.close();
     // copy from temp to actual store location
-    copyCarbonDataFileToCarbonStorePath(fileName);
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
+            model.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
   }
 
   /**
@@ -455,80 +418,20 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   protected void closeExecutorService() throws CarbonDataWriterException {
-    executorService.shutdown();
     try {
+      listener.finish();
+      executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.HOURS);
-    } catch (InterruptedException e) {
-      throw new CarbonDataWriterException(e.getMessage());
-    }
-    for (int i = 0; i < executorServiceSubmitList.size(); i++) {
-      try {
+      for (int i = 0; i < executorServiceSubmitList.size(); i++) {
         executorServiceSubmitList.get(i).get();
-      } catch (InterruptedException e) {
-        throw new CarbonDataWriterException(e.getMessage());
-      } catch (ExecutionException e) {
-        throw new CarbonDataWriterException(e.getMessage());
       }
+    } catch (InterruptedException | ExecutionException | IOException e) {
+      LOGGER.error(e, "Error while finishing writer");
+      throw new CarbonDataWriterException(e.getMessage());
     }
   }
 
 
-  /**
-   * This method will copy the given file to carbon store location
-   *
-   * @param localFileName local file name with full path
-   * @throws CarbonDataWriterException
-   */
-  protected void copyCarbonDataFileToCarbonStorePath(String localFileName)
-      throws CarbonDataWriterException {
-    long copyStartTime = System.currentTimeMillis();
-    LOGGER.info("Copying " + localFileName + " --> " + model.getCarbonDataDirectoryPath());
-    try {
-      CarbonFile localCarbonFile =
-          FileFactory.getCarbonFile(localFileName, FileFactory.getFileType(localFileName));
-      String carbonFilePath = model.getCarbonDataDirectoryPath() + localFileName
-          .substring(localFileName.lastIndexOf(File.separator));
-      copyLocalFileToCarbonStore(carbonFilePath, localFileName,
-          CarbonCommonConstants.BYTEBUFFER_SIZE,
-          getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize()));
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while copying file from local store to carbon store", e);
-    }
-    LOGGER.info(
-        "Total copy time (ms) to copy file " + localFileName + " is " + (System.currentTimeMillis()
-            - copyStartTime));
-  }
-
-  /**
-   * This method will read the local carbon data file and write to carbon data file in HDFS
-   *
-   * @param carbonStoreFilePath
-   * @param localFilePath
-   * @param bufferSize
-   * @param blockSize
-   * @throws IOException
-   */
-  private void copyLocalFileToCarbonStore(String carbonStoreFilePath, String localFilePath,
-      int bufferSize, long blockSize) throws IOException {
-    DataOutputStream dataOutputStream = null;
-    DataInputStream dataInputStream = null;
-    try {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + " is " + blockSize
-            + " (bytes");
-      }
-      dataOutputStream = FileFactory
-          .getDataOutputStream(carbonStoreFilePath, FileFactory.getFileType(carbonStoreFilePath),
-              bufferSize, blockSize);
-      dataInputStream = FileFactory
-          .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize);
-      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
-    } finally {
-      CarbonUtil.closeStream(dataInputStream);
-      CarbonUtil.closeStream(dataOutputStream);
-    }
-  }
 
   /**
    * This method will copy the carbon data file from local store location to
@@ -553,7 +456,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
      * @throws Exception if unable to compute a result
      */
     @Override public Void call() throws Exception {
-      copyCarbonDataFileToCarbonStorePath(fileName);
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
+          fileName,
+          model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
       return null;
     }
 


[3/3] carbondata git commit: [CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Posted by ja...@apache.org.
[CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Implemented interfaces for FG datamap and integrated to filterscanner to use the pruned bitset from FG datamap.
FG Query flow as follows.
1.The user can add FG datamap to any table and implement there interfaces.
2. Any filter query which hits the table with datamap will call prune method of FGdatamap.
3. The prune method of FGDatamap return list FineGrainBlocklet , these blocklets contain the information of block, blocklet, page and rowids information as well.
4. The pruned blocklets are internally wriitten to file and returns only the block , blocklet and filepath information as part of Splits.
5. Based on the splits scanrdd schedule the tasks.
6. In filterscanner we check the datamapwriterpath from split and reNoteads the bitset if exists. And pass this bitset as input to it.

This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e972fd3d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e972fd3d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e972fd3d

Branch: refs/heads/carbonstore
Commit: e972fd3d5cc8f392d47ca111b2d8f262edb29ac6
Parents: cae74a8
Author: ravipesala <ra...@gmail.com>
Authored: Wed Nov 15 19:48:40 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Feb 5 19:28:52 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapMeta.java    |   8 +-
 .../core/datamap/DataMapStoreManager.java       |  30 +-
 .../carbondata/core/datamap/DataMapType.java    |  21 +
 .../carbondata/core/datamap/TableDataMap.java   |  30 +-
 .../core/datamap/dev/AbstractDataMapWriter.java | 110 +++++
 .../core/datamap/dev/BlockletSerializer.java    |  57 +++
 .../carbondata/core/datamap/dev/DataMap.java    |   4 +-
 .../core/datamap/dev/DataMapFactory.java        |  14 +-
 .../core/datamap/dev/DataMapWriter.java         |  57 ---
 .../cgdatamap/AbstractCoarseGrainDataMap.java   |  24 +
 .../AbstractCoarseGrainDataMapFactory.java      |  34 ++
 .../dev/fgdatamap/AbstractFineGrainDataMap.java |  24 +
 .../AbstractFineGrainDataMapFactory.java        |  38 ++
 .../carbondata/core/datastore/DataRefNode.java  |   6 +
 .../core/datastore/block/TableBlockInfo.java    |  10 +
 .../impl/btree/AbstractBTreeLeafNode.java       |   5 +
 .../datastore/impl/btree/BTreeNonLeafNode.java  |   5 +
 .../carbondata/core/indexstore/Blocklet.java    |  30 +-
 .../indexstore/BlockletDataMapIndexStore.java   |   6 -
 .../core/indexstore/BlockletDetailsFetcher.java |   8 +
 .../core/indexstore/ExtendedBlocklet.java       |  17 +
 .../core/indexstore/FineGrainBlocklet.java      | 120 +++++
 .../blockletindex/BlockletDataMap.java          |  15 +-
 .../blockletindex/BlockletDataMapFactory.java   |  63 ++-
 .../blockletindex/BlockletDataRefNode.java      |  27 +-
 .../indexstore/blockletindex/IndexWrapper.java  |  18 +
 .../core/indexstore/schema/FilterType.java      |  24 -
 .../executer/ExcludeFilterExecuterImpl.java     |   3 +
 .../executer/IncludeFilterExecuterImpl.java     |   3 +
 .../scanner/impl/BlockletFilterScanner.java     |   2 +
 .../apache/carbondata/core/util/CarbonUtil.java |  97 ++++
 .../datamap/examples/MinMaxDataMap.java         |  32 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |  49 ++-
 .../datamap/examples/MinMaxDataWriter.java      |  36 +-
 .../examples/MinMaxIndexBlockDetails.java       |  13 -
 .../carbondata/hadoop/CarbonInputSplit.java     |  21 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  17 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   | 361 +++++++++++++++
 .../testsuite/datamap/DataMapWriterSuite.scala  |  46 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   | 440 +++++++++++++++++++
 .../iud/InsertOverwriteConcurrentTest.scala     |  22 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   7 +-
 .../TestStreamingTableOperation.scala           |   5 +-
 .../datamap/DataMapWriterListener.java          |  57 ++-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../store/writer/AbstractFactDataWriter.java    | 128 +-----
 46 files changed, 1763 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index 7746acf..dd15ccb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -19,15 +19,15 @@ package org.apache.carbondata.core.datamap;
 
 import java.util.List;
 
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 
 public class DataMapMeta {
 
   private List<String> indexedColumns;
 
-  private FilterType optimizedOperation;
+  private List<ExpressionType> optimizedOperation;
 
-  public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+  public DataMapMeta(List<String> indexedColumns, List<ExpressionType> optimizedOperation) {
     this.indexedColumns = indexedColumns;
     this.optimizedOperation = optimizedOperation;
   }
@@ -36,7 +36,7 @@ public class DataMapMeta {
     return indexedColumns;
   }
 
-  public FilterType getOptimizedOperation() {
+  public List<ExpressionType> getOptimizedOperation() {
     return optimizedOperation;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 90e5fff..8d80b4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -56,7 +56,22 @@ public final class DataMapStoreManager {
   }
 
   public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
-    return allDataMaps.get(identifier.uniqueName());
+    return allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+  }
+
+  // TODO its a temporary method till chooser is implemented
+  public TableDataMap chooseDataMap(AbsoluteTableIdentifier identifier) {
+    List<TableDataMap> tableDataMaps = getAllDataMap(identifier);
+    if (tableDataMaps != null && tableDataMaps.size() > 0) {
+      for (TableDataMap dataMap: tableDataMaps) {
+        if (!dataMap.getDataMapName().equalsIgnoreCase(BlockletDataMap.NAME)) {
+          return dataMap;
+        }
+      }
+      return tableDataMaps.get(0);
+    } else {
+      return getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    }
   }
 
   /**
@@ -68,7 +83,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
       String dataMapName, String factoryClass) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     TableDataMap dataMap;
     if (tableDataMaps == null) {
@@ -96,7 +111,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
       String factoryClassName, String dataMapName) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
@@ -149,7 +164,9 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(tableUniqueName);
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap: tableDataMaps) {
@@ -158,7 +175,7 @@ public final class DataMapStoreManager {
           break;
         }
       }
-      allDataMaps.remove(identifier.uniqueName());
+      allDataMaps.remove(tableUniqueName);
     }
   }
 
@@ -167,7 +184,8 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
     if (tableDataMaps != null) {
       int i = 0;
       for (TableDataMap tableDataMap: tableDataMaps) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
new file mode 100644
index 0000000..bf812b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+public enum DataMapType {
+  CG,FG;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 1c80703..42fc702 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -76,10 +79,15 @@ public final class TableDataMap extends OperationEventListener {
     SegmentProperties segmentProperties;
     for (String segmentId : segmentIds) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
-      segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
-      for (DataMap dataMap : dataMaps) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+      // if filter is not passed then return all the blocklets
+      if (filterExp == null) {
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segmentId, partitions);
+      } else {
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
+        for (DataMap dataMap : dataMaps) {
+          pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+        }
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
           .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
@@ -137,9 +145,21 @@ public final class TableDataMap extends OperationEventListener {
               segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()),
               partitions));
     }
-    for (Blocklet blocklet: blocklets) {
+    BlockletSerializer serializer = new BlockletSerializer();
+    String writePath =
+        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
+    if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+      FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
+    }
+    for (Blocklet blocklet : blocklets) {
       ExtendedBlocklet detailedBlocklet =
           blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId());
+      if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+        String blockletwritePath =
+            writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
+        detailedBlocklet.setDataMapWriterPath(blockletwritePath);
+        serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
+      }
       detailedBlocklet.setSegmentId(distributable.getSegmentId());
       detailedBlocklets.add(detailedBlocklet);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
new file mode 100644
index 0000000..bcc9bad
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Data Map writer
+ */
+public abstract class AbstractDataMapWriter {
+
+  protected AbsoluteTableIdentifier identifier;
+
+  protected String segmentId;
+
+  protected String writeDirectoryPath;
+
+  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String writeDirectoryPath) {
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.writeDirectoryPath = writeDirectoryPath;
+  }
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  public abstract void onBlockStart(String blockId);
+
+  /**
+   * End of block notification
+   */
+  public abstract void onBlockEnd(String blockId);
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletStart(int blockletId);
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletEnd(int blockletId);
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  public abstract void finish() throws IOException;
+
+  /**
+   * It copies the file from temp folder to actual folder
+   *
+   * @param dataMapFile
+   * @throws IOException
+   */
+  protected void commitFile(String dataMapFile) throws IOException {
+    if (!dataMapFile.startsWith(writeDirectoryPath)) {
+      throw new UnsupportedOperationException(
+          "Datamap file " + dataMapFile + " is not written in provided directory path "
+              + writeDirectoryPath);
+    }
+    String dataMapFileName =
+        dataMapFile.substring(writeDirectoryPath.length(), dataMapFile.length());
+    String carbonFilePath = dataMapFileName.substring(0, dataMapFileName.lastIndexOf("/"));
+    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+    if (carbonFilePath.length() > 0) {
+      carbonFilePath = segmentPath + carbonFilePath;
+      FileFactory.mkdirs(carbonFilePath, FileFactory.getFileType(carbonFilePath));
+    } else {
+      carbonFilePath = segmentPath;
+    }
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(dataMapFile, carbonFilePath, 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
new file mode 100644
index 0000000..3d4c717
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public class BlockletSerializer {
+
+  /**
+   * Serialize and write blocklet to the file.
+   * @param grainBlocklet
+   * @param writePath
+   * @throws IOException
+   */
+  public void serializeBlocklet(FineGrainBlocklet grainBlocklet, String writePath)
+      throws IOException {
+    DataOutputStream dataOutputStream =
+        FileFactory.getDataOutputStream(writePath, FileFactory.getFileType(writePath));
+    grainBlocklet.write(dataOutputStream);
+    dataOutputStream.close();
+  }
+
+  /**
+   * Read data from filepath and deserialize blocklet.
+   * @param writePath
+   * @return
+   * @throws IOException
+   */
+  public FineGrainBlocklet deserializeBlocklet(String writePath) throws IOException {
+    DataInputStream inputStream =
+        FileFactory.getDataInputStream(writePath, FileFactory.getFileType(writePath));
+    FineGrainBlocklet blocklet = new FineGrainBlocklet();
+    blocklet.readFields(inputStream);
+    inputStream.close();
+    return blocklet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index dfe97e3..3fa6d75 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 /**
  * Datamap is an entity which can store and retrieve index data.
  */
-public interface DataMap {
+public interface DataMap<T extends Blocklet> {
 
   /**
    * It is called to load the data map to memory or to initialize it.
@@ -41,7 +41,7 @@ public interface DataMap {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+  List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<String> partitions);
 
   // TODO Move this method to Abstract class

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f5a7404..e900f8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,13 +21,14 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 
 /**
  * Interface for datamap factory, it is responsible for creating the datamap.
  */
-public interface DataMapFactory {
+public interface DataMapFactory<T extends DataMap> {
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
@@ -37,17 +38,17 @@ public interface DataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  DataMapWriter createWriter(String segmentId);
+  AbstractDataMapWriter createWriter(String segmentId, String writeDirectoryPath);
 
   /**
    * Get the datamap for segmentid
    */
-  List<DataMap> getDataMaps(String segmentId) throws IOException;
+  List<T> getDataMaps(String segmentId) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException;
+  List<T> getDataMaps(DataMapDistributable distributable) throws IOException;
 
   /**
    * Get all distributable objects of a segmentid
@@ -75,4 +76,9 @@ public interface DataMapFactory {
    * Return metadata of this datamap
    */
   DataMapMeta getMeta();
+
+  /**
+   *  Type of datamap whether it is FG or CG
+   */
+  DataMapType getDataMapType();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
deleted file mode 100644
index 413eaa5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datamap.dev;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter {
-
-  /**
-   *  Start of new block notification.
-   *  @param blockId file name of the carbondata file
-   */
-  void onBlockStart(String blockId, String blockPath);
-
-  /**
-   * End of block notification
-   */
-  void onBlockEnd(String blockId);
-
-  /**
-   * Start of new blocklet notification.
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletStart(int blockletId);
-
-  /**
-   * End of blocklet notification
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletEnd(int blockletId);
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in DataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
new file mode 100644
index 0000000..d79d0c6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.Blocklet;
+
+public abstract class AbstractCoarseGrainDataMap implements DataMap<Blocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..9789992
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune method of CGdatamap.
+ *  2. The prune method of CGDatamap return list Blocklet , these blocklets contain the
+ *     information of block and blocklet.
+ *  3. Based on the splits scanrdd schedule the tasks.
+ */
+public abstract class AbstractCoarseGrainDataMapFactory
+    implements DataMapFactory<AbstractCoarseGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.CG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
new file mode 100644
index 0000000..310fb3b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public abstract class AbstractFineGrainDataMap implements DataMap<FineGrainBlocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
new file mode 100644
index 0000000..1ca7fc3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune method of FGdatamap.
+ *  2. The prune method of FGDatamap return list FineGrainBlocklet , these blocklets contain the
+ *     information of block, blocklet, page and rowids information as well.
+ *  3. The pruned blocklets are internally wriitten to file and returns only the block ,
+ *    blocklet and filepath information as part of Splits.
+ *  4. Based on the splits scanrdd schedule the tasks.
+ *  5. In filterscanner we check the datamapwriterpath from split and reNoteads the
+ *     bitset if exists. And pass this bitset as input to it.
+ */
+public abstract class AbstractFineGrainDataMapFactory
+    implements DataMapFactory<AbstractFineGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.FG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index 273f833..df0896a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Interface data block reference
@@ -119,4 +120,9 @@ public interface DataRefNode {
    */
   MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex) throws IOException;
 
+  /**
+   * Return the indexed data if it has any from disk which was stored by FG datamap.
+   * @return
+   */
+  BitSetGroup getIndexedData();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index c3cc551..907708c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -90,6 +90,8 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private BlockletDetailInfo detailInfo;
 
+  private String dataMapWriterPath;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
       String[] locations, long blockLength, ColumnarFormatVersion version,
       String[] deletedDeltaFilePath) {
@@ -410,4 +412,12 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setBlockletId(String blockletId) {
     this.blockletId = blockletId;
   }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
index fe4cf83..f5a751b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Non leaf node abstract class
@@ -222,4 +223,8 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
   public int getPageRowCount(int pageNumber) {
     throw new UnsupportedOperationException("Unsupported operation");
   }
+
+  @Override public BitSetGroup getIndexedData() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
index c200f8d..a6eb695 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * No leaf node of a b+tree class which will keep the matadata(start key) of the
@@ -227,6 +228,10 @@ public class BTreeNonLeafNode implements BTreeNode {
     throw new UnsupportedOperationException("Unsupported operation");
   }
 
+  public BitSetGroup getIndexedData() {
+    return null;
+  }
+
   /**
    * number of pages in blocklet
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index d84f3f6..c731e07 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,28 +16,46 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
 /**
  * Blocklet
  */
-public class Blocklet implements Serializable {
+public class Blocklet implements Writable,Serializable {
 
-  private String path;
+  private String blockId;
 
   private String blockletId;
 
-  public Blocklet(String path, String blockletId) {
-    this.path = path;
+  public Blocklet(String blockId, String blockletId) {
+    this.blockId = blockId;
     this.blockletId = blockletId;
   }
 
-  public String getPath() {
-    return path;
+  // For serialization purpose
+  public Blocklet() {
   }
 
   public String getBlockletId() {
     return blockletId;
   }
 
+  public String getBlockId() {
+    return blockId;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeUTF(blockId);
+    out.writeUTF(blockletId);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    blockId = in.readUTF();
+    blockletId = in.readUTF();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 111a7a2..7598961 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -104,7 +103,6 @@ public class BlockletDataMapIndexStore
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
     List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
-    ExecutorService service = null;
     // Get the datamaps for each indexfile from cache.
     try {
       for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
@@ -151,10 +149,6 @@ public class BlockletDataMapIndexStore
         dataMap.clear();
       }
       throw new IOException("Problem in loading segment blocks.", e);
-    } finally {
-      if (service != null) {
-        service.shutdownNow();
-      }
     }
     return blockletDataMaps;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 21ecba1..3ed826a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -44,4 +44,12 @@ public interface BlockletDetailsFetcher {
    * @throws IOException
    */
   ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException;
+
+  /**
+   * Get all the blocklets in a segment
+   *
+   * @param segmentId
+   * @return
+   */
+  List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index d1bfa35..58a9344 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -29,8 +29,13 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String[] location;
 
+  private String path;
+
+  private String dataMapWriterPath;
+
   public ExtendedBlocklet(String path, String blockletId) {
     super(path, blockletId);
+    this.path = path;
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -60,4 +65,16 @@ public class ExtendedBlocklet extends Blocklet {
   public void setSegmentId(String segmentId) {
     this.segmentId = segmentId;
   }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
new file mode 100644
index 0000000..266120e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * FineGrainBlocklet
+ */
+public class FineGrainBlocklet extends Blocklet implements Serializable {
+
+  private List<Page> pages;
+
+  public FineGrainBlocklet(String blockId, String blockletId, List<Page> pages) {
+    super(blockId, blockletId);
+    this.pages = pages;
+  }
+
+  // For serialization purpose
+  public FineGrainBlocklet() {
+
+  }
+
+  public List<Page> getPages() {
+    return pages;
+  }
+
+  public static class Page implements Writable,Serializable {
+
+    private int pageId;
+
+    private int[] rowId;
+
+    public BitSet getBitSet() {
+      BitSet bitSet =
+          new BitSet(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      for (int row : rowId) {
+        bitSet.set(row);
+      }
+      return bitSet;
+    }
+
+    @Override public void write(DataOutput out) throws IOException {
+      out.writeInt(pageId);
+      out.writeInt(rowId.length);
+      for (int i = 0; i < rowId.length; i++) {
+        out.writeInt(rowId[i]);
+      }
+    }
+
+    @Override public void readFields(DataInput in) throws IOException {
+      pageId = in.readInt();
+      int length = in.readInt();
+      rowId = new int[length];
+      for (int i = 0; i < length; i++) {
+        rowId[i] = in.readInt();
+      }
+    }
+
+    public void setPageId(int pageId) {
+      this.pageId = pageId;
+    }
+
+    public void setRowId(int[] rowId) {
+      this.rowId = rowId;
+    }
+  }
+
+  public BitSetGroup getBitSetGroup(int numberOfPages) {
+    BitSetGroup bitSetGroup = new BitSetGroup(numberOfPages);
+    for (int i = 0; i < pages.size(); i++) {
+      bitSetGroup.setBitSet(pages.get(i).getBitSet(), pages.get(i).pageId);
+    }
+    return bitSetGroup;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    super.write(out);
+    int size = pages.size();
+    out.writeInt(size);
+    for (Page page : pages) {
+      page.write(out);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int size = in.readInt();
+    pages = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      Page page = new Page();
+      page.readFields(in);
+      pages.add(page);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index d331c2b..99a47ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -35,8 +35,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -73,7 +73,7 @@ import org.xerial.snappy.Snappy;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap implements DataMap, Cacheable {
+public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cacheable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
@@ -609,9 +609,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
     for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
       DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
-      boolean isScanRequired = FilterExpressionProcessor
-          .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
-              getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+      boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
+          filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+          getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
       if (isScanRequired) {
         return true;
       }
@@ -684,7 +684,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
         startIndex++;
       }
     }
-
     return blocklets;
   }
 
@@ -977,4 +976,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return memoryUsed;
   }
 
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 61e5ceb..f6b8165 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -27,9 +27,10 @@ import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -39,9 +40,6 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -53,7 +51,8 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher,
+public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
+    implements BlockletDetailsFetcher,
     SegmentPropertiesFetcher {
 
   private AbsoluteTableIdentifier identifier;
@@ -61,10 +60,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
-  // segmentId -> SegmentProperties.
-  private Map<String, SegmentProperties> segmentPropertiesMap = new HashMap<>();
-
-  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainDataMap> cache;
 
   @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -74,12 +70,12 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public DataMapWriter createWriter(String segmentId) {
+  public AbstractDataMapWriter createWriter(String segmentId, String dataWriterPath) {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public List<DataMap> getDataMaps(String segmentId) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         getTableBlockIndexUniqueIdentifiers(segmentId);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -140,17 +136,18 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
-    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
+    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
       if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }
     }
-    throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
+    throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
   }
 
 
+
   @Override
   public List<DataMapDistributable> toDistributable(String segmentId) {
     CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
@@ -179,7 +176,6 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   @Override
   public void clear(String segmentId) {
-    segmentPropertiesMap.remove(segmentId);
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
@@ -200,7 +196,8 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
     if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
@@ -217,7 +214,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
                 indexFile));
       }
     }
-    List<DataMap> dataMaps;
+    List<AbstractCoarseGrainDataMap> dataMaps;
     try {
       dataMaps = cache.getAll(identifiers);
     } catch (IOException e) {
@@ -233,23 +230,21 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
-    SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId);
-    if (segmentProperties == null) {
-      int[] columnCardinality;
-      List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-          getTableBlockIndexUniqueIdentifiers(segmentId);
-      DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-      List<DataFileFooter> indexInfo =
-          fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath());
-      for (DataFileFooter fileFooter : indexInfo) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        if (segmentProperties == null) {
-          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
-          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        }
-      }
-      segmentPropertiesMap.put(segmentId, segmentProperties);
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    assert (dataMaps.size() > 0);
+    AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
+    assert (coarseGrainDataMap instanceof BlockletDataMap);
+    BlockletDataMap dataMap = (BlockletDataMap) coarseGrainDataMap;
+    return dataMap.getSegmentProperties();
+  }
+
+  @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions)
+      throws IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    for (AbstractCoarseGrainDataMap dataMap : dataMaps) {
+      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions));
     }
-    return segmentProperties;
+    return blocklets;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
index 4d10fd6..ad4a7cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -29,7 +30,9 @@ import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 
 /**
@@ -43,6 +46,8 @@ public class BlockletDataRefNode implements DataRefNode {
 
   private int[] dimensionLens;
 
+  private BlockletSerializer blockletSerializer;
+
   BlockletDataRefNode(List<TableBlockInfo> blockInfos, int index, int[] dimensionLens) {
     this.blockInfos = blockInfos;
     // Update row count and page count to blocklet info
@@ -67,6 +72,7 @@ public class BlockletDataRefNode implements DataRefNode {
     }
     this.index = index;
     this.dimensionLens = dimensionLens;
+    this.blockletSerializer = new BlockletSerializer();
   }
 
   @Override public DataRefNode getNextDataRefNode() {
@@ -165,11 +171,28 @@ public class BlockletDataRefNode implements DataRefNode {
     }
   }
 
-  @Override public int numberOfPages() {
+  @Override
+  public int numberOfPages() {
     return blockInfos.get(index).getDetailInfo().getPagesCount();
   }
 
-  @Override public int getPageRowCount(int pageNumber) {
+  @Override
+  public BitSetGroup getIndexedData() {
+    String dataMapWriterPath = blockInfos.get(index).getDataMapWriterPath();
+    if (dataMapWriterPath != null) {
+      try {
+        FineGrainBlocklet blocklet = blockletSerializer.deserializeBlocklet(dataMapWriterPath);
+        return blocklet.getBitSetGroup(numberOfPages());
+      } catch (IOException e) {
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getPageRowCount(int pageNumber) {
     return blockInfos.get(index).getDetailInfo().getBlockletInfo()
         .getNumberOfRowsPerPage()[pageNumber];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index a30f64c..95232e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -21,6 +21,8 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 
 /**
@@ -29,7 +31,10 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
  */
 public class IndexWrapper extends AbstractIndex {
 
+  private List<TableBlockInfo> blockInfos;
+
   public IndexWrapper(List<TableBlockInfo> blockInfos) {
+    this.blockInfos = blockInfos;
     segmentProperties = new SegmentProperties(blockInfos.get(0).getDetailInfo().getColumnSchemas(),
         blockInfos.get(0).getDetailInfo().getDimLens());
     dataRefNode = new BlockletDataRefNode(blockInfos, 0,
@@ -38,4 +43,17 @@ public class IndexWrapper extends AbstractIndex {
 
   @Override public void buildIndex(List<DataFileFooter> footerList) {
   }
+
+  @Override public void clear() {
+    super.clear();
+    if (blockInfos != null) {
+      for (TableBlockInfo blockInfo : blockInfos) {
+        String dataMapWriterPath = blockInfo.getDataMapWriterPath();
+        if (dataMapWriterPath != null) {
+          CarbonFile file = FileFactory.getCarbonFile(dataMapWriterPath);
+          FileFactory.deleteAllCarbonFilesOfDir(file);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
deleted file mode 100644
index 9d77010..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.schema;
-
-/**
- * Types of filters of select query
- */
-public enum FilterType {
-  EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 5974666..26b0f67 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -314,6 +314,9 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnPage dimensionColumnPage,
       BitSetGroup prvBitSetGroup, int pageNumber) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet();
     bitSet.or(prvPageBitSet);
     byte[][] filterKeys = dimColumnExecuterInfo.getExcludeFilterKeys();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 05328f3..516ed41 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -334,6 +334,9 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnPage dimensionColumnPage,
       BitSetGroup prvBitSetGroup, int pageNumber, int numberOfRows) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet(numberOfRows);
     byte[][] filterKeys = dimColumnExecuterInfo.getFilterKeys();
     int compareResult = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 1c73d63..033c3dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -163,6 +163,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
         totalBlockletStatistic.getCount() + 1);
+    // set the indexed data if it has any during fgdatamap pruning.
+    rawBlockletColumnChunks.setBitSetGroup(rawBlockletColumnChunks.getDataBlock().getIndexedData());
     // apply filter on actual data, for each page
     BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks,
         useBitSetPipeLine);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 80a382c..c8b36e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -55,6 +56,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -99,6 +101,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -131,6 +134,13 @@ public final class CarbonUtil {
 
   private static final Configuration conf = new Configuration(true);
 
+  /**
+   * dfs.bytes-per-checksum
+   * HDFS checksum length, block size for a file should be exactly divisible
+   * by this value
+   */
+  private static final int HDFS_CHECKSUM_LENGTH = 512;
+
   private CarbonUtil() {
 
   }
@@ -2356,5 +2366,92 @@ public final class CarbonUtil {
     return Base64.decodeBase64(objectString.getBytes(CarbonCommonConstants.DEFAULT_CHARSET));
   }
 
+
+  /**
+   * This method will copy the given file to carbon store location
+   *
+   * @param localFilePath local file name with full path
+   * @throws CarbonDataWriterException
+   */
+  public static void copyCarbonDataFileToCarbonStorePath(String localFilePath,
+      String carbonDataDirectoryPath, long fileSizeInBytes)
+      throws CarbonDataWriterException {
+    long copyStartTime = System.currentTimeMillis();
+    LOGGER.info("Copying " + localFilePath + " --> " + carbonDataDirectoryPath);
+    try {
+      CarbonFile localCarbonFile =
+          FileFactory.getCarbonFile(localFilePath, FileFactory.getFileType(localFilePath));
+      String carbonFilePath = carbonDataDirectoryPath + localFilePath
+          .substring(localFilePath.lastIndexOf(File.separator));
+      copyLocalFileToCarbonStore(carbonFilePath, localFilePath,
+          CarbonCommonConstants.BYTEBUFFER_SIZE,
+          getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize()));
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while copying file from local store to carbon store", e);
+    }
+    LOGGER.info(
+        "Total copy time (ms) to copy file " + localFilePath + " is " + (System.currentTimeMillis()
+            - copyStartTime));
+  }
+
+  /**
+   * This method will read the local carbon data file and write to carbon data file in HDFS
+   *
+   * @param carbonStoreFilePath
+   * @param localFilePath
+   * @param bufferSize
+   * @param blockSize
+   * @throws IOException
+   */
+  private static void copyLocalFileToCarbonStore(String carbonStoreFilePath, String localFilePath,
+      int bufferSize, long blockSize) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + " is " + blockSize
+            + " (bytes");
+      }
+      dataOutputStream = FileFactory
+          .getDataOutputStream(carbonStoreFilePath, FileFactory.getFileType(carbonStoreFilePath),
+              bufferSize, blockSize);
+      dataInputStream = FileFactory
+          .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
+    } finally {
+      CarbonUtil.closeStream(dataInputStream);
+      CarbonUtil.closeStream(dataOutputStream);
+    }
+  }
+
+  /**
+   * This method will return max of block size and file size
+   *
+   * @param blockSize
+   * @param fileSize
+   * @return
+   */
+  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
+    long maxSize = blockSize;
+    if (fileSize > blockSize) {
+      maxSize = fileSize;
+    }
+    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
+    // per checksum, dfs.bytes-per-checksum=512 must divide block size
+    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
+    if (remainder > 0) {
+      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
+    }
+    // convert to make block size more readable.
+    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
+    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
+    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
+    LOGGER.info(
+        "The configured block size is " + readableBlockSize + ", the actual carbon file size is "
+            + readableFileSize + ", choose the max value " + readableMaxSize
+            + " as the block size on HDFS");
+    return maxSize;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
index 2ad6327..8002e57 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
@@ -28,7 +28,8 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -36,7 +37,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -47,7 +50,7 @@ import com.google.gson.Gson;
 /**
  * Datamap implementation for min max blocklet.
  */
-public class MinMaxDataMap implements DataMap {
+public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
@@ -58,8 +61,9 @@ public class MinMaxDataMap implements DataMap {
 
   private MinMaxIndexBlockDetails[] readMinMaxDataMap;
 
-  @Override public void init(String filePath) throws MemoryException, IOException {
-    this.filePath = filePath;
+  @Override
+  public void init(DataMapModel model) throws MemoryException, IOException {
+    this.filePath = model.getFilePath();
     CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
     for (int i = 0; i < listFiles.length; i++) {
       readMinMaxDataMap = readJson(listFiles[i].getPath());
@@ -76,7 +80,7 @@ public class MinMaxDataMap implements DataMap {
     });
   }
 
-  public MinMaxIndexBlockDetails[] readJson(String filePath) throws IOException {
+  private MinMaxIndexBlockDetails[] readJson(String filePath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
@@ -90,8 +94,7 @@ public class MinMaxDataMap implements DataMap {
         return null;
       }
       dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      inStream = new InputStreamReader(dataInputStream, "UTF-8");
       buffReader = new BufferedReader(inStream);
       readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class);
     } catch (IOException e) {
@@ -109,14 +112,14 @@ public class MinMaxDataMap implements DataMap {
    * @param segmentProperties
    * @return
    */
-  @Override public List<Blocklet> prune(FilterResolverIntf filterExp,
-      SegmentProperties segmentProperties) {
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp,
+      SegmentProperties segmentProperties, List<String> partitions) {
     List<Blocklet> blocklets = new ArrayList<>();
 
     if (filterExp == null) {
       for (int i = 0; i < readMinMaxDataMap.length; i++) {
-        blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(),
-            String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+        blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId())));
       }
     } else {
       FilterExecuter filterExecuter =
@@ -126,7 +129,7 @@ public class MinMaxDataMap implements DataMap {
         BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
             readMinMaxDataMap[startIndex].getMinValues());
         if (!bitSet.isEmpty()) {
-          blocklets.add(new Blocklet(readMinMaxDataMap[startIndex].getFilePath(),
+          blocklets.add(new Blocklet(filePath,
               String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
         }
         startIndex++;
@@ -136,6 +139,11 @@ public class MinMaxDataMap implements DataMap {
   }
 
   @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void clear() {
     readMinMaxDataMap = null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e972fd3d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index b196d0d..5203cb3 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -25,49 +25,51 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.events.Event;
 
 /**
  * Min Max DataMap Factory
  */
-public class MinMaxDataMapFactory implements DataMapFactory {
+public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
-  @Override
-  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
     this.identifier = identifier;
   }
 
   /**
    * createWriter will return the MinMaxDataWriter.
+   *
    * @param segmentId
    * @return
    */
-  @Override
-  public DataMapWriter createWriter(String segmentId) {
-    return new MinMaxDataWriter();
+  @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) {
+    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
   }
 
   /**
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+   *
    * @param segmentId
    * @return
    * @throws IOException
    */
-  @Override public List<DataMap> getDataMaps(String segmentId) throws IOException {
-    List<DataMap> dataMapList = new ArrayList<>();
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId)
+      throws IOException {
+    List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxDataMap.
     MinMaxDataMap dataMap = new MinMaxDataMap();
     try {
-      dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator);
+      dataMap.init(new DataMapModel(
+          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator));
     } catch (MemoryException ex) {
 
     }
@@ -76,7 +78,6 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   }
 
   /**
-   *
    * @param segmentId
    * @return
    */
@@ -86,6 +87,7 @@ public class MinMaxDataMapFactory implements DataMapFactory {
 
   /**
    * Clear the DataMap.
+   *
    * @param segmentId
    */
   @Override public void clear(String segmentId) {
@@ -94,21 +96,20 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   /**
    * Clearing the data map.
    */
-  @Override
-  public void clear() {
+  @Override public void clear() {
   }
 
-  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     return null;
   }
 
-  @Override
-  public void fireEvent(ChangeEvent event) {
+  @Override public void fireEvent(Event event) {
 
   }
 
-  @Override
-  public DataMapMeta getMeta() {
-    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), FilterType.EQUALTO);
+  @Override public DataMapMeta getMeta() {
+    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
+        new ArrayList<ExpressionType>());
   }
 }
\ No newline at end of file