You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:38 UTC

[1/2] carbondata git commit: [CARBONDATA-1544][Datamap] Datamap FineGrain implementation [Forced Update!]

Repository: carbondata
Updated Branches:
  refs/heads/fgdatamap 65d05375f -> e9e85a02a (forced update)


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index 78544d3..fe0bbcf 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.datamap.examples;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
@@ -29,17 +28,18 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
 
-public class MinMaxDataWriter implements DataMapWriter {
+public class MinMaxDataWriter extends AbstractDataMapWriter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(TableInfo.class.getName());
@@ -50,17 +50,23 @@ public class MinMaxDataWriter implements DataMapWriter {
 
   private Map<Integer, BlockletMinMax> blockMinMaxMap;
 
-  private String blockPath;
+  private String dataWritePath;
 
+  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String dataWritePath) {
+    super(identifier, segmentId, dataWritePath);
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.dataWritePath = dataWritePath;
+  }
 
-  @Override public void onBlockStart(String blockId, String blockPath) {
+  @Override public void onBlockStart(String blockId) {
     pageLevelMax = null;
     pageLevelMin = null;
     blockletLevelMax = null;
     blockletLevelMin = null;
     blockMinMaxMap = null;
     blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
-    this.blockPath = blockPath;
   }
 
   @Override public void onBlockEnd(String blockId) {
@@ -161,7 +167,7 @@ public class MinMaxDataWriter implements DataMapWriter {
     List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null;
     tempMinMaxIndexBlockDetails = loadBlockDetails();
     try {
-      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId);
+      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockId);
     } catch (IOException ex) {
       LOGGER.info(" Unable to write the file");
     }
@@ -178,7 +184,6 @@ public class MinMaxDataWriter implements DataMapWriter {
       tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
       tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
       tmpminMaxIndexBlockDetails.setBlockletId(index);
-      tmpminMaxIndexBlockDetails.setFilePath(this.blockPath);
       minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails);
     }
     return minMaxIndexBlockDetails;
@@ -187,22 +192,19 @@ public class MinMaxDataWriter implements DataMapWriter {
   /**
    * Write the data to a file. This is JSON format file.
    * @param minMaxIndexBlockDetails
-   * @param blockPath
    * @param blockId
    * @throws IOException
    */
   public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails,
-      String blockPath, String blockId) throws IOException {
-    String filePath = blockPath.substring(0, blockPath.lastIndexOf(File.separator) + 1) + blockId
-        + ".minmaxindex";
+      String blockId) throws IOException {
+    String filePath = dataWritePath +"/" + blockId + ".minmaxindex";
     BufferedWriter brWriter = null;
     DataOutputStream dataOutStream = null;
     try {
       FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath));
       dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath));
       Gson gsonObjectToWrite = new Gson();
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, "UTF-8"));
       String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails);
       brWriter.write(minmaxIndexData);
     } catch (IOException ioe) {
@@ -215,7 +217,11 @@ public class MinMaxDataWriter implements DataMapWriter {
         dataOutStream.flush();
       }
       CarbonUtil.closeStreams(brWriter, dataOutStream);
+      commitFile(filePath);
     }
   }
 
+  @Override public void finish() throws IOException {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
index 0596db5..93a453e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java
@@ -33,11 +33,6 @@ public class MinMaxIndexBlockDetails implements Serializable {
   private byte[][] maxValues;
 
   /**
-   * filePath pointing to the block.
-   */
-  private String filePath;
-
-  /**
    * BlockletID of the block.
    */
   private Integer BlockletId;
@@ -59,14 +54,6 @@ public class MinMaxIndexBlockDetails implements Serializable {
     this.maxValues = maxValues;
   }
 
-  public String getFilePath() {
-    return filePath;
-  }
-
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
-
   public Integer getBlockletId() {
     return BlockletId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 3e8ede2..2362b76 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -411,7 +411,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       }
       carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
           ColumnarFormatVersion.valueOf(
-              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
+              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null));
     }
     return carbonSplits;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index e89c2d6..3ac4642 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -84,6 +84,8 @@ public class CarbonInputSplit extends FileSplit
 
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
+  private String dataMapWritePath;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -94,7 +96,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      ColumnarFormatVersion version, String[] deleteDeltaFiles) {
+      ColumnarFormatVersion version, String[] deleteDeltaFiles, String dataMapWritePath) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -106,11 +108,12 @@ public class CarbonInputSplit extends FileSplit
     this.invalidSegments = new ArrayList<>();
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
+    this.dataMapWritePath = dataMapWritePath;
   }
 
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
       int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) {
-    this(segmentId, path, start, length, locations, version, deleteDeltaFiles);
+    this(segmentId, path, start, length, locations, version, deleteDeltaFiles, null);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
@@ -157,10 +160,10 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public static CarbonInputSplit from(String segmentId, FileSplit split,
-      ColumnarFormatVersion version)
+      ColumnarFormatVersion version, String dataMapWritePath)
       throws IOException {
     return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
-        split.getLocations(), version, null);
+        split.getLocations(), version, null, dataMapWritePath);
   }
 
   public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
@@ -174,6 +177,7 @@ public class CarbonInputSplit extends FileSplit
                 split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
                 split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
+        blockInfo.setDataMapWriterPath(split.dataMapWritePath);
         tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);
@@ -221,6 +225,10 @@ public class CarbonInputSplit extends FileSplit
       detailInfo = new BlockletDetailInfo();
       detailInfo.readFields(in);
     }
+    boolean dataMapWriterPathExists = in.readBoolean();
+    if (dataMapWriterPathExists) {
+      dataMapWritePath = in.readUTF();
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -242,6 +250,10 @@ public class CarbonInputSplit extends FileSplit
     if (detailInfo != null) {
       detailInfo.write(out);
     }
+    out.writeBoolean(dataMapWritePath != null);
+    if (dataMapWritePath != null) {
+      out.writeUTF(dataMapWritePath);
+    }
   }
 
   public List<String> getInvalidSegments() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 22eca9f..5a72a02 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -35,6 +35,7 @@ import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -726,16 +727,17 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     // get tokens for all the required FileSystem for table path
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
         new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
-
-    TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
-            BlockletDataMapFactory.class.getName());
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    TableDataMap blockletMap =
+        DataMapStoreManager.getInstance().chooseDataMap(absoluteTableIdentifier);
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (dataMapJob != null) {
+    if (distributedCG || blockletMap.getDataMapFactory().getDataMapType() == DataMapType.FG) {
       DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, BlockletDataMap.NAME,
+          new DistributableDataMapFormat(absoluteTableIdentifier, blockletMap.getDataMapName(),
               segmentIds, partitionsToPrune,
               BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
@@ -793,10 +795,12 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       return null;
     }
     org.apache.carbondata.hadoop.CarbonInputSplit split =
-        org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
+        org.apache.carbondata.hadoop.CarbonInputSplit.from(
+            blocklet.getSegmentId(),
             new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
                 blocklet.getLocations()),
-            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+            blocklet.getDataMapWriterPath());
     split.setDetailInfo(blocklet.getDetailInfo());
     return split;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
new file mode 100644
index 0000000..68c1bb9
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.FileHolder
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapName: String = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+    this.dataMapName = dataMapName
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
+    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map {f =>
+      val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+
+  /**
+   * Get datamaps for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractCoarseGrainDataMap = new CGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event): Unit = {
+    ???
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+  }
+}
+
+class CGDataMap extends AbstractCoarseGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+  var fileHolder: FileHolder = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = fileHolder.readInt(filePath, size-4)
+    val bytes = fileHolder.readByteArray(filePath, size-footerLen-4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[Blocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f=>
+      new Blocklet(f._1, f._2+"")
+    }.asJava
+  }
+
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+    }
+    tuples
+  }
+
+  private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.getChildren != null) {
+      expression.getChildren.asScala.map { f =>
+        if (f.isInstanceOf[EqualToExpression]) {
+          buffer += f
+        }
+        getEqualToExpression(f, buffer)
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear() = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String,
+    dataWritePath: String,
+    dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+
+  var currentBlockId: String = null
+  val cgwritepath = dataWritePath + "/" +
+                    dataMapName + System.nanoTime() + ".datamap"
+  lazy val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+  val blockletList = new ArrayBuffer[Array[Byte]]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+    maxMin +=
+    ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[Array[Byte]]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += newBytes
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+    blockletList += sorted.head
+    blockletList += sorted.last
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(cgwritepath)
+  }
+
+
+}
+
+class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test cg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+    sql(
+      """
+        | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      table.getAbsoluteTableIdentifier,
+      classOf[CGDataMapFactory].getName, "cgdatamap")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test_cg")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 6f4e9cd..090371f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,25 +20,30 @@ package org.apache.carbondata.spark.testsuite.datamap
 import java.util
 
 import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
 
-class C2DataMapFactory() extends DataMapFactory {
+class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
+
+  var identifier: AbsoluteTableIdentifier = _
 
   override def init(identifier: AbsoluteTableIdentifier,
-      dataMapName: String): Unit = {}
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+  }
 
   override def fireEvent(event: Event): Unit = ???
 
@@ -46,13 +51,14 @@ class C2DataMapFactory() extends DataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter =
+    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath)
 
-  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
 
   /**
    * Get all distributable objects of a segmentid
@@ -62,6 +68,7 @@ class C2DataMapFactory() extends DataMapFactory {
   override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
     ???
   }
+
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
@@ -166,9 +173,12 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
 }
 
 object DataMapWriterSuite {
+
   var callbackSeq: Seq[String] = Seq[String]()
 
-  val dataMapWriterC2Mock = new DataMapWriter {
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+      dataWritePath: String) =
+    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
 
     override def onPageAdded(
         blockletId: Int,
@@ -193,9 +203,21 @@ object DataMapWriterSuite {
       callbackSeq :+= s"blocklet start $blockletId"
     }
 
-    override def onBlockStart(blockId: String, blockPath: String): Unit = {
+    /**
+     * Start of new block notification.
+     *
+     * @param blockId file name of the carbondata file
+     */
+    override def onBlockStart(blockId: String) = {
       callbackSeq :+= s"block start $blockId"
     }
 
+    /**
+     * This is called during closing of writer.So after this call no more data will be sent to this
+     * class.
+     */
+    override def finish() = {
+
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
new file mode 100644
index 0000000..8ef320c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.FileHolder
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
+  var identifier: AbsoluteTableIdentifier = _
+  var dataMapName: String = _
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {
+    this.identifier = identifier
+    this.dataMapName = dataMapName
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
+    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  }
+
+  /**
+   * Get the datamap for segmentid
+   */
+  override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val dataMap: AbstractFineGrainDataMap = new FGDataMap()
+      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap
+    }.toList.asJava
+  }
+
+  /**
+   * Get datamap for distributable object.
+   */
+  override def getDataMaps(
+      distributable: DataMapDistributable): java.util.List[AbstractFineGrainDataMap]= {
+    val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+    val dataMap: AbstractFineGrainDataMap = new FGDataMap()
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    Seq(dataMap).asJava
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+    val file = FileFactory
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+    val files = file.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+    })
+    files.map { f =>
+      val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+      d
+    }.toList.asJava
+  }
+
+
+  /**
+   *
+   * @param event
+   */
+  override def fireEvent(event: Event):Unit = {
+    ???
+  }
+
+  /**
+   * Clears datamap of the segment
+   */
+  override def clear(segmentId: String): Unit = {
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  override def clear(): Unit = {
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  override def getMeta: DataMapMeta = {
+    new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+  }
+}
+
+class FGDataMap extends AbstractFineGrainDataMap {
+
+  var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _
+  var fileHolder: FileHolder = _
+  var filePath: String = _
+  val compressor = new SnappyCompressor
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  override def init(dataMapModel: DataMapModel): Unit = {
+    this.filePath = dataMapModel.getFilePath
+    val size = FileFactory.getCarbonFile(filePath).getSize
+    fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val footerLen = fileHolder.readInt(filePath, size - 4)
+    val bytes = fileHolder.readByteArray(filePath, size - footerLen - 4, footerLen)
+    val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(in)
+    maxMin = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]]
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  override def prune(
+      filterExp: FilterResolverIntf,
+      segmentProperties: SegmentProperties,
+      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+    val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+    val expression = filterExp.getFilterExpression
+    getEqualToExpression(expression, buffer)
+    val value = buffer.map { f =>
+      f.getChildren.get(1).evaluate(null).getString
+    }
+    val meta = findMeta(value(0).getBytes)
+    meta.map { f =>
+      readAndFindData(f, value(0).getBytes())
+    }.filter(_.isDefined).map(_.get).asJava
+  }
+
+  private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
+      value: Array[Byte]): Option[FineGrainBlocklet] = {
+    val bytes = fileHolder.readByteArray(filePath, meta._4, meta._5)
+    val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+    val obj = new ObjectInputStream(outputStream)
+    val blockletsData = obj.readObject()
+      .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]]
+
+    import scala.collection.Searching._
+    val searching = blockletsData
+      .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[
+      (Array[Byte], Seq[Seq[Int]], Seq[Int])] {
+      override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]),
+          y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = {
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1)
+      }
+    })
+    if (searching.insertionPoint >= 0) {
+      val f = blockletsData(searching.insertionPoint)
+      val pages = f._3.zipWithIndex.map { p =>
+        val pg = new FineGrainBlocklet.Page
+        pg.setPageId(p._1)
+        pg.setRowId(f._2(p._2).toArray)
+        pg
+      }
+      pages
+      Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
+    } else {
+      None
+    }
+
+  }
+
+  private def findMeta(value: Array[Byte]) = {
+    val tuples = maxMin.filter { f =>
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 &&
+      ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0
+    }
+    tuples
+  }
+
+  def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+    if (expression.getChildren != null) {
+      expression.getChildren.asScala.map { f =>
+        if (f.isInstanceOf[EqualToExpression]) {
+          buffer += f
+        }
+        getEqualToExpression(f, buffer)
+      }
+    }
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  override def clear():Unit = {
+    ???
+  }
+
+  override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
+    segmentId: String, dataWriterPath: String, dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+
+  var currentBlockId: String = null
+  val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+  val stream: DataOutputStream = FileFactory
+    .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
+  val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
+  val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]()
+  var position: Long = 0
+  val compressor = new SnappyCompressor
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  override def onBlockStart(blockId: String): Unit = {
+    currentBlockId = blockId
+  }
+
+  /**
+   * End of block notification
+   */
+  override def onBlockEnd(blockId: String): Unit = {
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletStart(blockletId: Int): Unit = {
+
+  }
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  override def onBlockletEnd(blockletId: Int): Unit = {
+    val sorted = blockletList
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null
+    var addedLast: Boolean = false
+    val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]()
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3)
+          addedLast = false
+        } else {
+          blockletListUpdated += oldValue
+          oldValue = (f._1, Seq(f._2), f._3)
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), f._3)
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletListUpdated += oldValue
+    }
+
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(blockletListUpdated)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    maxMin +=
+    ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last
+      ._1), position, bytes.length))
+    position += bytes.length
+    blockletList.clear()
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  override def onPageAdded(blockletId: Int,
+      pageId: Int,
+      pages: Array[ColumnPage]): Unit = {
+    val size = pages(0).getPageSize
+    val list = new ArrayBuffer[(Array[Byte], Int)]()
+    var i = 0
+    while (i < size) {
+      val bytes = pages(0).getBytes(i)
+      val newBytes = new Array[Byte](bytes.length - 2)
+      System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+      list += ((newBytes, i))
+      i = i + 1
+    }
+    // Sort based on the column data in order to create index.
+    val sorted = list
+      .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0)
+    var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null
+    var addedLast: Boolean = false
+    // Merge all same column values to single row.
+    sorted.foreach { f =>
+      if (oldValue != null) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) {
+          oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3)
+          addedLast = false
+        } else {
+          blockletList += oldValue
+          oldValue = (f._1, Seq(f._2), Seq(pageId))
+          addedLast = true
+        }
+      } else {
+        oldValue = (f._1, Seq(f._2), Seq(pageId))
+        addedLast = false
+      }
+    }
+    if (!addedLast && oldValue != null) {
+      blockletList += oldValue
+    }
+  }
+
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  override def finish(): Unit = {
+    val out = new ByteOutputStream()
+    val outStream = new ObjectOutputStream(out)
+    outStream.writeObject(maxMin)
+    outStream.close()
+    val bytes = compressor.compressByte(out.getBytes)
+    stream.write(bytes)
+    stream.writeInt(bytes.length)
+    stream.close()
+    commitFile(fgwritepath)
+  }
+
+
+}
+
+class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test fg datamap") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      table.getAbsoluteTableIdentifier,
+      classOf[FGDataMapFactory].getName, "fgdatamap")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 09dbd71..6551aec 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -427,9 +427,10 @@ class CarbonScanRDD(
     CarbonTableInputFormat.setQuerySegment(conf, identifier)
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
-    if (CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+    CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    if (CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
       CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 12a8b8b..d35c52d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -738,6 +738,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       handoffSize = 1024L * 200
     )
     sql("alter table streaming.stream_table_finish finish streaming")
+    Thread.sleep(1000)
     sql("show segments for table streaming.stream_table_finish").show(100, false)
 
     val segments = sql("show segments for table streaming.stream_table_finish").collect()
@@ -850,8 +851,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .start()
           qry.awaitTermination()
         } catch {
-          case ex =>
-            throw new Exception(ex.getMessage)
+          case ex: Throwable =>
+            LOGGER.error(ex.getMessage)
+            throw new Exception(ex.getMessage, ex)
         } finally {
           if (null != qry) {
             qry.stop()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 8e350d9..31a6701 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.datamap;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,8 +29,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.processing.store.TablePage;
@@ -43,25 +44,26 @@ public class DataMapWriterListener {
       DataMapWriterListener.class.getCanonicalName());
 
   // list indexed column name -> list of data map writer
-  private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+  private Map<List<String>, List<AbstractDataMapWriter>> registry = new ConcurrentHashMap<>();
 
   /**
    * register all datamap writer for specified table and segment
    */
-  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String dataWritePath) {
     List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
         DataMapFactory factory = tableDataMap.getDataMapFactory();
-        register(factory, segmentId);
+        register(factory, segmentId, dataWritePath);
       }
     }
   }
 
   /**
-   * Register a DataMapWriter
+   * Register a AbstractDataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId) {
+  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();
@@ -70,8 +72,8 @@ public class DataMapWriterListener {
       return;
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
-    List<DataMapWriter> writers = registry.get(columns);
-    DataMapWriter writer = factory.createWriter(segmentId);
+    List<AbstractDataMapWriter> writers = registry.get(columns);
+    AbstractDataMapWriter writer = factory.createWriter(segmentId, dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {
@@ -79,36 +81,36 @@ public class DataMapWriterListener {
       writers.add(writer);
       registry.put(columns, writers);
     }
-    LOG.info("DataMapWriter " + writer + " added");
+    LOG.info("AbstractDataMapWriter " + writer + " added");
   }
 
   public void onBlockStart(String blockId, String blockPath) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId, blockPath);
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
+        writer.onBlockStart(blockId);
       }
     }
   }
 
   public void onBlockEnd(String blockId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockEnd(blockId);
       }
     }
   }
 
   public void onBlockletStart(int blockletId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockletStart(blockletId);
       }
     }
   }
 
   public void onBlockletEnd(int blockletId) {
-    for (List<DataMapWriter> writers : registry.values()) {
-      for (DataMapWriter writer : writers) {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
         writer.onBlockletEnd(blockletId);
       }
     }
@@ -121,18 +123,29 @@ public class DataMapWriterListener {
    * @param tablePage  page data
    */
   public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
-    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
-    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+    Set<Map.Entry<List<String>, List<AbstractDataMapWriter>>> entries = registry.entrySet();
+    for (Map.Entry<List<String>, List<AbstractDataMapWriter>> entry : entries) {
       List<String> indexedColumns = entry.getKey();
       ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
       for (int i = 0; i < indexedColumns.size(); i++) {
         pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
       }
-      List<DataMapWriter> writers = entry.getValue();
-      for (DataMapWriter writer : writers) {
+      List<AbstractDataMapWriter> writers = entry.getValue();
+      for (AbstractDataMapWriter writer : writers) {
         writer.onPageAdded(blockletId, pageId, pages);
       }
     }
   }
 
+  /**
+   * Finish all datamap writers
+   */
+  public void finish() throws IOException {
+    for (List<AbstractDataMapWriter> writers : registry.values()) {
+      for (AbstractDataMapWriter writer : writers) {
+        writer.finish();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d15152c..497989a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -260,7 +261,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
 
     DataMapWriterListener listener = new DataMapWriterListener();
-    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId());
+    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
+        storeLocation[new Random().nextInt(storeLocation.length)]);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
 
@@ -325,6 +327,12 @@ public class CarbonFactDataHandlerModel {
         segmentProperties.getDimensions(),
         segmentProperties.getMeasures());
 
+    DataMapWriterListener listener = new DataMapWriterListener();
+    listener.registerAllWriter(
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        loadModel.getSegmentId(),
+        tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
+    carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     return carbonFactDataHandlerModel;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index b00ec70..04e304f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.store.writer;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
@@ -41,14 +39,11 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -62,7 +57,6 @@ import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.io.IOUtils;
 
 public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
@@ -70,12 +64,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
 
   /**
-   * dfs.bytes-per-checksum
-   * HDFS checksum length, block size for a file should be exactly divisible
-   * by this value
-   */
-  private static final int HDFS_CHECKSUM_LENGTH = 512;
-  /**
    * file channel
    */
   protected FileChannel fileChannel;
@@ -207,35 +195,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   }
 
   /**
-   * This method will return max of block size and file size
-   *
-   * @param blockSize
-   * @param fileSize
-   * @return
-   */
-  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
-    long maxSize = blockSize;
-    if (fileSize > blockSize) {
-      maxSize = fileSize;
-    }
-    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
-    // per checksum, dfs.bytes-per-checksum=512 must divide block size
-    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
-    if (remainder > 0) {
-      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
-    }
-    // convert to make block size more readable.
-    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
-    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
-    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
-    LOGGER.info(
-        "The configured block size is " + readableBlockSize + ", the actual carbon file size is "
-            + readableFileSize + ", choose the max value " + readableMaxSize
-            + " as the block size on HDFS");
-    return maxSize;
-  }
-
-  /**
    * This method will be used to update the file channel with new file if exceeding block size
    * threshold, new file will be created once existing file reached the file size limit This
    * method will first check whether existing file size is exceeded the file
@@ -296,7 +255,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     String fileName = this.carbonDataFileTempPath.substring(0,
         this.carbonDataFileTempPath.lastIndexOf('.'));
     if (copyInCurrentThread) {
-      copyCarbonDataFileToCarbonStorePath(fileName);
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
+          fileName, dataWriterVo.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
     } else {
       executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName)));
     }
@@ -449,7 +410,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     }
     writer.close();
     // copy from temp to actual store location
-    copyCarbonDataFileToCarbonStorePath(fileName);
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
+            dataWriterVo.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
   }
 
   /**
@@ -459,20 +422,16 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   protected void closeExecutorService() throws CarbonDataWriterException {
-    executorService.shutdown();
     try {
+      listener.finish();
+      executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.HOURS);
-    } catch (InterruptedException e) {
-      throw new CarbonDataWriterException(e.getMessage());
-    }
-    for (int i = 0; i < executorServiceSubmitList.size(); i++) {
-      try {
+      for (int i = 0; i < executorServiceSubmitList.size(); i++) {
         executorServiceSubmitList.get(i).get();
-      } catch (InterruptedException e) {
-        throw new CarbonDataWriterException(e.getMessage());
-      } catch (ExecutionException e) {
-        throw new CarbonDataWriterException(e.getMessage());
       }
+    } catch (InterruptedException | ExecutionException | IOException e) {
+      LOGGER.error(e, "Error while finishing writer");
+      throw new CarbonDataWriterException(e.getMessage());
     }
   }
 
@@ -490,62 +449,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     }
   }
 
-  /**
-   * This method will copy the given file to carbon store location
-   *
-   * @param localFileName local file name with full path
-   * @throws CarbonDataWriterException
-   */
-  protected void copyCarbonDataFileToCarbonStorePath(String localFileName)
-      throws CarbonDataWriterException {
-    long copyStartTime = System.currentTimeMillis();
-    LOGGER.info("Copying " + localFileName + " --> " + dataWriterVo.getCarbonDataDirectoryPath());
-    try {
-      CarbonFile localCarbonFile =
-          FileFactory.getCarbonFile(localFileName, FileFactory.getFileType(localFileName));
-      String carbonFilePath = dataWriterVo.getCarbonDataDirectoryPath() + localFileName
-          .substring(localFileName.lastIndexOf(File.separator));
-      copyLocalFileToCarbonStore(carbonFilePath, localFileName,
-          CarbonCommonConstants.BYTEBUFFER_SIZE,
-          getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize()));
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while copying file from local store to carbon store", e);
-    }
-    LOGGER.info(
-        "Total copy time (ms) to copy file " + localFileName + " is " + (System.currentTimeMillis()
-            - copyStartTime));
-  }
-
-  /**
-   * This method will read the local carbon data file and write to carbon data file in HDFS
-   *
-   * @param carbonStoreFilePath
-   * @param localFilePath
-   * @param bufferSize
-   * @param blockSize
-   * @throws IOException
-   */
-  private void copyLocalFileToCarbonStore(String carbonStoreFilePath, String localFilePath,
-      int bufferSize, long blockSize) throws IOException {
-    DataOutputStream dataOutputStream = null;
-    DataInputStream dataInputStream = null;
-    try {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + " is " + blockSize
-            + " (bytes");
-      }
-      dataOutputStream = FileFactory
-          .getDataOutputStream(carbonStoreFilePath, FileFactory.getFileType(carbonStoreFilePath),
-              bufferSize, blockSize);
-      dataInputStream = FileFactory
-          .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize);
-      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
-    } finally {
-      CarbonUtil.closeStream(dataInputStream);
-      CarbonUtil.closeStream(dataOutputStream);
-    }
-  }
 
   /**
    * This method will copy the carbon data file from local store location to
@@ -570,7 +473,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
      * @throws Exception if unable to compute a result
      */
     @Override public Void call() throws Exception {
-      copyCarbonDataFileToCarbonStorePath(fileName);
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
+          fileName,
+          dataWriterVo.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
       return null;
     }
 


[2/2] carbondata git commit: [CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Posted by ja...@apache.org.
[CARBONDATA-1544][Datamap] Datamap FineGrain implementation

Implemented interfaces for FG datamap and integrated to filterscanner to use the pruned bitset from FG datamap.
FG Query flow as follows.
1.The user can add FG datamap to any table and implement there interfaces.
2. Any filter query which hits the table with datamap will call prune method of FGdatamap.
3. The prune method of FGDatamap return list FineGrainBlocklet , these blocklets contain the information of block, blocklet, page and rowids information as well.
4. The pruned blocklets are internally wriitten to file and returns only the block , blocklet and filepath information as part of Splits.
5. Based on the splits scanrdd schedule the tasks.
6. In filterscanner we check the datamapwriterpath from split and reNoteads the bitset if exists. And pass this bitset as input to it.

This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e9e85a02
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e9e85a02
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e9e85a02

Branch: refs/heads/fgdatamap
Commit: e9e85a02af02c7b57e7a24f844039145406f5983
Parents: a8114b9
Author: ravipesala <ra...@gmail.com>
Authored: Wed Nov 15 19:48:40 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Dec 24 21:26:26 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapMeta.java    |   8 +-
 .../core/datamap/DataMapStoreManager.java       |  30 +-
 .../carbondata/core/datamap/DataMapType.java    |  21 +
 .../carbondata/core/datamap/TableDataMap.java   |  30 +-
 .../core/datamap/dev/AbstractDataMapWriter.java | 110 +++++
 .../core/datamap/dev/BlockletSerializer.java    |  57 +++
 .../carbondata/core/datamap/dev/DataMap.java    |   4 +-
 .../core/datamap/dev/DataMapFactory.java        |  14 +-
 .../core/datamap/dev/DataMapWriter.java         |  57 ---
 .../cgdatamap/AbstractCoarseGrainDataMap.java   |  24 +
 .../AbstractCoarseGrainDataMapFactory.java      |  34 ++
 .../dev/fgdatamap/AbstractFineGrainDataMap.java |  24 +
 .../AbstractFineGrainDataMapFactory.java        |  38 ++
 .../carbondata/core/datastore/DataRefNode.java  |   7 +
 .../core/datastore/block/TableBlockInfo.java    |  10 +
 .../impl/btree/AbstractBTreeLeafNode.java       |   5 +
 .../datastore/impl/btree/BTreeNonLeafNode.java  |   5 +
 .../carbondata/core/indexstore/Blocklet.java    |  30 +-
 .../core/indexstore/BlockletDetailsFetcher.java |   8 +
 .../core/indexstore/ExtendedBlocklet.java       |  19 +-
 .../core/indexstore/FineGrainBlocklet.java      | 120 +++++
 .../blockletindex/BlockletDataMap.java          |  15 +-
 .../blockletindex/BlockletDataMapFactory.java   |  62 ++-
 .../BlockletDataRefNodeWrapper.java             |  30 +-
 .../indexstore/blockletindex/IndexWrapper.java  |  18 +
 .../core/indexstore/schema/FilterType.java      |  24 -
 .../executer/ExcludeFilterExecuterImpl.java     |   3 +
 .../executer/IncludeFilterExecuterImpl.java     |   3 +
 .../core/scan/processor/BlocksChunkHolder.java  |   5 -
 .../core/scan/scanner/impl/FilterScanner.java   |   2 +
 .../apache/carbondata/core/util/CarbonUtil.java |  97 ++++
 .../datamap/examples/MinMaxDataMap.java         |  32 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |  49 ++-
 .../datamap/examples/MinMaxDataWriter.java      |  36 +-
 .../examples/MinMaxIndexBlockDetails.java       |  13 -
 .../carbondata/hadoop/CarbonInputFormat.java    |   2 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  20 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  20 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   | 361 +++++++++++++++
 .../testsuite/datamap/DataMapWriterSuite.scala  |  46 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   | 440 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   7 +-
 .../TestStreamingTableOperation.scala           |   6 +-
 .../datamap/DataMapWriterListener.java          |  57 ++-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../store/writer/AbstractFactDataWriter.java    | 126 +-----
 46 files changed, 1755 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index 7746acf..dd15ccb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -19,15 +19,15 @@ package org.apache.carbondata.core.datamap;
 
 import java.util.List;
 
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 
 public class DataMapMeta {
 
   private List<String> indexedColumns;
 
-  private FilterType optimizedOperation;
+  private List<ExpressionType> optimizedOperation;
 
-  public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+  public DataMapMeta(List<String> indexedColumns, List<ExpressionType> optimizedOperation) {
     this.indexedColumns = indexedColumns;
     this.optimizedOperation = optimizedOperation;
   }
@@ -36,7 +36,7 @@ public class DataMapMeta {
     return indexedColumns;
   }
 
-  public FilterType getOptimizedOperation() {
+  public List<ExpressionType> getOptimizedOperation() {
     return optimizedOperation;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 90e5fff..8d80b4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -56,7 +56,22 @@ public final class DataMapStoreManager {
   }
 
   public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
-    return allDataMaps.get(identifier.uniqueName());
+    return allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+  }
+
+  // TODO its a temporary method till chooser is implemented
+  public TableDataMap chooseDataMap(AbsoluteTableIdentifier identifier) {
+    List<TableDataMap> tableDataMaps = getAllDataMap(identifier);
+    if (tableDataMaps != null && tableDataMaps.size() > 0) {
+      for (TableDataMap dataMap: tableDataMaps) {
+        if (!dataMap.getDataMapName().equalsIgnoreCase(BlockletDataMap.NAME)) {
+          return dataMap;
+        }
+      }
+      return tableDataMaps.get(0);
+    } else {
+      return getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    }
   }
 
   /**
@@ -68,7 +83,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
       String dataMapName, String factoryClass) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     TableDataMap dataMap;
     if (tableDataMaps == null) {
@@ -96,7 +111,7 @@ public final class DataMapStoreManager {
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
       String factoryClassName, String dataMapName) {
-    String table = identifier.uniqueName();
+    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
@@ -149,7 +164,9 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(tableUniqueName);
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap: tableDataMaps) {
@@ -158,7 +175,7 @@ public final class DataMapStoreManager {
           break;
         }
       }
-      allDataMaps.remove(identifier.uniqueName());
+      allDataMaps.remove(tableUniqueName);
     }
   }
 
@@ -167,7 +184,8 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
-    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    List<TableDataMap> tableDataMaps =
+        allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
     if (tableDataMaps != null) {
       int i = 0;
       for (TableDataMap tableDataMap: tableDataMaps) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
new file mode 100644
index 0000000..bf812b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+public enum DataMapType {
+  CG,FG;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index e678920..b3bae94 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -76,10 +79,15 @@ public final class TableDataMap implements OperationEventListener {
     SegmentProperties segmentProperties;
     for (String segmentId : segmentIds) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
-      segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
-      for (DataMap dataMap : dataMaps) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+      // if filter is not passed then return all the blocklets
+      if (filterExp == null) {
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segmentId);
+      } else {
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId);
+        for (DataMap dataMap : dataMaps) {
+          pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+        }
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
           .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
@@ -137,9 +145,21 @@ public final class TableDataMap implements OperationEventListener {
               segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()),
               partitions));
     }
-    for (Blocklet blocklet: blocklets) {
+    BlockletSerializer serializer = new BlockletSerializer();
+    String writePath =
+        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
+    if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+      FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
+    }
+    for (Blocklet blocklet : blocklets) {
       ExtendedBlocklet detailedBlocklet =
           blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId());
+      if (dataMapFactory.getDataMapType() == DataMapType.FG) {
+        String blockletwritePath =
+            writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
+        detailedBlocklet.setDataMapWriterPath(blockletwritePath);
+        serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
+      }
       detailedBlocklet.setSegmentId(distributable.getSegmentId());
       detailedBlocklets.add(detailedBlocklet);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
new file mode 100644
index 0000000..bcc9bad
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Data Map writer
+ */
+public abstract class AbstractDataMapWriter {
+
+  protected AbsoluteTableIdentifier identifier;
+
+  protected String segmentId;
+
+  protected String writeDirectoryPath;
+
+  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+      String writeDirectoryPath) {
+    this.identifier = identifier;
+    this.segmentId = segmentId;
+    this.writeDirectoryPath = writeDirectoryPath;
+  }
+
+  /**
+   * Start of new block notification.
+   *
+   * @param blockId file name of the carbondata file
+   */
+  public abstract void onBlockStart(String blockId);
+
+  /**
+   * End of block notification
+   */
+  public abstract void onBlockEnd(String blockId);
+
+  /**
+   * Start of new blocklet notification.
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletStart(int blockletId);
+
+  /**
+   * End of blocklet notification
+   *
+   * @param blockletId sequence number of blocklet in the block
+   */
+  public abstract void onBlockletEnd(int blockletId);
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  public abstract void finish() throws IOException;
+
+  /**
+   * It copies the file from temp folder to actual folder
+   *
+   * @param dataMapFile
+   * @throws IOException
+   */
+  protected void commitFile(String dataMapFile) throws IOException {
+    if (!dataMapFile.startsWith(writeDirectoryPath)) {
+      throw new UnsupportedOperationException(
+          "Datamap file " + dataMapFile + " is not written in provided directory path "
+              + writeDirectoryPath);
+    }
+    String dataMapFileName =
+        dataMapFile.substring(writeDirectoryPath.length(), dataMapFile.length());
+    String carbonFilePath = dataMapFileName.substring(0, dataMapFileName.lastIndexOf("/"));
+    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+    if (carbonFilePath.length() > 0) {
+      carbonFilePath = segmentPath + carbonFilePath;
+      FileFactory.mkdirs(carbonFilePath, FileFactory.getFileType(carbonFilePath));
+    } else {
+      carbonFilePath = segmentPath;
+    }
+    CarbonUtil.copyCarbonDataFileToCarbonStorePath(dataMapFile, carbonFilePath, 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
new file mode 100644
index 0000000..3d4c717
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/BlockletSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public class BlockletSerializer {
+
+  /**
+   * Serialize and write blocklet to the file.
+   * @param grainBlocklet
+   * @param writePath
+   * @throws IOException
+   */
+  public void serializeBlocklet(FineGrainBlocklet grainBlocklet, String writePath)
+      throws IOException {
+    DataOutputStream dataOutputStream =
+        FileFactory.getDataOutputStream(writePath, FileFactory.getFileType(writePath));
+    grainBlocklet.write(dataOutputStream);
+    dataOutputStream.close();
+  }
+
+  /**
+   * Read data from filepath and deserialize blocklet.
+   * @param writePath
+   * @return
+   * @throws IOException
+   */
+  public FineGrainBlocklet deserializeBlocklet(String writePath) throws IOException {
+    DataInputStream inputStream =
+        FileFactory.getDataInputStream(writePath, FileFactory.getFileType(writePath));
+    FineGrainBlocklet blocklet = new FineGrainBlocklet();
+    blocklet.readFields(inputStream);
+    inputStream.close();
+    return blocklet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index dfe97e3..3fa6d75 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 /**
  * Datamap is an entity which can store and retrieve index data.
  */
-public interface DataMap {
+public interface DataMap<T extends Blocklet> {
 
   /**
    * It is called to load the data map to memory or to initialize it.
@@ -41,7 +41,7 @@ public interface DataMap {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+  List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<String> partitions);
 
   // TODO Move this method to Abstract class

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f5a7404..e900f8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,13 +21,14 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 
 /**
  * Interface for datamap factory, it is responsible for creating the datamap.
  */
-public interface DataMapFactory {
+public interface DataMapFactory<T extends DataMap> {
 
   /**
    * Initialization of Datamap factory with the identifier and datamap name
@@ -37,17 +38,17 @@ public interface DataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  DataMapWriter createWriter(String segmentId);
+  AbstractDataMapWriter createWriter(String segmentId, String writeDirectoryPath);
 
   /**
    * Get the datamap for segmentid
    */
-  List<DataMap> getDataMaps(String segmentId) throws IOException;
+  List<T> getDataMaps(String segmentId) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException;
+  List<T> getDataMaps(DataMapDistributable distributable) throws IOException;
 
   /**
    * Get all distributable objects of a segmentid
@@ -75,4 +76,9 @@ public interface DataMapFactory {
    * Return metadata of this datamap
    */
   DataMapMeta getMeta();
+
+  /**
+   *  Type of datamap whether it is FG or CG
+   */
+  DataMapType getDataMapType();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
deleted file mode 100644
index 413eaa5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datamap.dev;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter {
-
-  /**
-   *  Start of new block notification.
-   *  @param blockId file name of the carbondata file
-   */
-  void onBlockStart(String blockId, String blockPath);
-
-  /**
-   * End of block notification
-   */
-  void onBlockEnd(String blockId);
-
-  /**
-   * Start of new blocklet notification.
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletStart(int blockletId);
-
-  /**
-   * End of blocklet notification
-   * @param blockletId sequence number of blocklet in the block
-   */
-  void onBlockletEnd(int blockletId);
-  /**
-   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
-   * DataMapMeta returned in DataMapFactory.
-   *
-   * Implementation should copy the content of `pages` as needed, because `pages` memory
-   * may be freed after this method returns, if using unsafe column page.
-   */
-  void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
new file mode 100644
index 0000000..d79d0c6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.Blocklet;
+
+public abstract class AbstractCoarseGrainDataMap implements DataMap<Blocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..9789992
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/AbstractCoarseGrainDataMapFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.cgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune method of CGdatamap.
+ *  2. The prune method of CGDatamap return list Blocklet , these blocklets contain the
+ *     information of block and blocklet.
+ *  3. Based on the splits scanrdd schedule the tasks.
+ */
+public abstract class AbstractCoarseGrainDataMapFactory
+    implements DataMapFactory<AbstractCoarseGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.CG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
new file mode 100644
index 0000000..310fb3b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMap.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
+
+public abstract class AbstractFineGrainDataMap implements DataMap<FineGrainBlocklet> {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
new file mode 100644
index 0000000..1ca7fc3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/AbstractFineGrainDataMapFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.fgdatamap;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+
+/**
+ *  1. Any filter query which hits the table with datamap will call prune method of FGdatamap.
+ *  2. The prune method of FGDatamap return list FineGrainBlocklet , these blocklets contain the
+ *     information of block, blocklet, page and rowids information as well.
+ *  3. The pruned blocklets are internally wriitten to file and returns only the block ,
+ *    blocklet and filepath information as part of Splits.
+ *  4. Based on the splits scanrdd schedule the tasks.
+ *  5. In filterscanner we check the datamapwriterpath from split and reNoteads the
+ *     bitset if exists. And pass this bitset as input to it.
+ */
+public abstract class AbstractFineGrainDataMapFactory
+    implements DataMapFactory<AbstractFineGrainDataMap> {
+
+  @Override public DataMapType getDataMapType() {
+    return DataMapType.FG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index 37acb02..f258a88 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Interface data block reference
@@ -124,6 +125,12 @@ public interface DataRefNode {
   BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache();
 
   /**
+   * Return the indexed data if it has any from disk which was stored by FG datamap.
+   * @return
+   */
+  BitSetGroup getIndexedData();
+
+  /**
    * number of pages in blocklet
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 910c9bb..a1ef45f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -85,6 +85,8 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private BlockletDetailInfo detailInfo;
 
+  private String dataMapWriterPath;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -356,4 +358,12 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setDetailInfo(BlockletDetailInfo detailInfo) {
     this.detailInfo = detailInfo;
   }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
index 19b1f1c..952b7b3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * Non leaf node abstract class
@@ -244,4 +245,8 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
   public int getPageRowCount(int pageNumber) {
     throw new UnsupportedOperationException("Unsupported operation");
   }
+
+  @Override public BitSetGroup getIndexedData() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
index 62fcabf..ec5eca6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * No leaf node of a b+tree class which will keep the matadata(start key) of the
@@ -245,6 +246,10 @@ public class BTreeNonLeafNode implements BTreeNode {
     return deleteDeltaDataCache;
   }
 
+  public BitSetGroup getIndexedData() {
+    return null;
+  }
+
   /**
    * number of pages in blocklet
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index d84f3f6..c731e07 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,28 +16,46 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
 /**
  * Blocklet
  */
-public class Blocklet implements Serializable {
+public class Blocklet implements Writable,Serializable {
 
-  private String path;
+  private String blockId;
 
   private String blockletId;
 
-  public Blocklet(String path, String blockletId) {
-    this.path = path;
+  public Blocklet(String blockId, String blockletId) {
+    this.blockId = blockId;
     this.blockletId = blockletId;
   }
 
-  public String getPath() {
-    return path;
+  // For serialization purpose
+  public Blocklet() {
   }
 
   public String getBlockletId() {
     return blockletId;
   }
 
+  public String getBlockId() {
+    return blockId;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeUTF(blockId);
+    out.writeUTF(blockletId);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    blockId = in.readUTF();
+    blockletId = in.readUTF();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 21ecba1..a493c06 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -44,4 +44,12 @@ public interface BlockletDetailsFetcher {
    * @throws IOException
    */
   ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException;
+
+  /**
+   * Get all the blocklets in a segment
+   *
+   * @param segmentId
+   * @return
+   */
+  List<Blocklet> getAllBlocklets(String segmentId) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index e0cfefb..081da53 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -38,8 +38,13 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String[] location;
 
+  private String path;
+
+  private String dataMapWriterPath;
+
   public ExtendedBlocklet(String path, String blockletId) {
     super(path, blockletId);
+    this.path = path;
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -56,7 +61,7 @@ public class ExtendedBlocklet extends Blocklet {
    * @throws IOException
    */
   public void updateLocations() throws IOException {
-    Path path = new Path(getPath());
+    Path path = new Path(this.path);
     FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
     RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
     LocatedFileStatus fileStatus = iter.next();
@@ -79,4 +84,16 @@ public class ExtendedBlocklet extends Blocklet {
   public void setSegmentId(String segmentId) {
     this.segmentId = segmentId;
   }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getDataMapWriterPath() {
+    return dataMapWriterPath;
+  }
+
+  public void setDataMapWriterPath(String dataMapWriterPath) {
+    this.dataMapWriterPath = dataMapWriterPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
new file mode 100644
index 0000000..266120e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * FineGrainBlocklet
+ */
+public class FineGrainBlocklet extends Blocklet implements Serializable {
+
+  private List<Page> pages;
+
+  public FineGrainBlocklet(String blockId, String blockletId, List<Page> pages) {
+    super(blockId, blockletId);
+    this.pages = pages;
+  }
+
+  // For serialization purpose
+  public FineGrainBlocklet() {
+
+  }
+
+  public List<Page> getPages() {
+    return pages;
+  }
+
+  public static class Page implements Writable,Serializable {
+
+    private int pageId;
+
+    private int[] rowId;
+
+    public BitSet getBitSet() {
+      BitSet bitSet =
+          new BitSet(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      for (int row : rowId) {
+        bitSet.set(row);
+      }
+      return bitSet;
+    }
+
+    @Override public void write(DataOutput out) throws IOException {
+      out.writeInt(pageId);
+      out.writeInt(rowId.length);
+      for (int i = 0; i < rowId.length; i++) {
+        out.writeInt(rowId[i]);
+      }
+    }
+
+    @Override public void readFields(DataInput in) throws IOException {
+      pageId = in.readInt();
+      int length = in.readInt();
+      rowId = new int[length];
+      for (int i = 0; i < length; i++) {
+        rowId[i] = in.readInt();
+      }
+    }
+
+    public void setPageId(int pageId) {
+      this.pageId = pageId;
+    }
+
+    public void setRowId(int[] rowId) {
+      this.rowId = rowId;
+    }
+  }
+
+  public BitSetGroup getBitSetGroup(int numberOfPages) {
+    BitSetGroup bitSetGroup = new BitSetGroup(numberOfPages);
+    for (int i = 0; i < pages.size(); i++) {
+      bitSetGroup.setBitSet(pages.get(i).getBitSet(), pages.get(i).pageId);
+    }
+    return bitSetGroup;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    super.write(out);
+    int size = pages.size();
+    out.writeInt(size);
+    for (Page page : pages) {
+      page.write(out);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int size = in.readInt();
+    pages = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      Page page = new Page();
+      page.readFields(in);
+      pages.add(page);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 5cb11c2..24e9103 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -68,7 +68,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap implements DataMap, Cacheable {
+public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cacheable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
@@ -451,9 +451,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
     for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
       DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
-      boolean isScanRequired = FilterExpressionProcessor
-          .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
-              getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+      boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
+          filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+          getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
       if (isScanRequired) {
         return true;
       }
@@ -523,7 +523,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
         startIndex++;
       }
     }
-
     return blocklets;
   }
 
@@ -774,4 +773,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return memoryUsed;
   }
 
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 61e5ceb..49ed5e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -27,9 +27,10 @@ import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -39,9 +40,6 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -53,7 +51,8 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher,
+public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
+    implements BlockletDetailsFetcher,
     SegmentPropertiesFetcher {
 
   private AbsoluteTableIdentifier identifier;
@@ -61,10 +60,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
-  // segmentId -> SegmentProperties.
-  private Map<String, SegmentProperties> segmentPropertiesMap = new HashMap<>();
-
-  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainDataMap> cache;
 
   @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -74,12 +70,12 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public DataMapWriter createWriter(String segmentId) {
+  public AbstractDataMapWriter createWriter(String segmentId, String dataWriterPath) {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public List<DataMap> getDataMaps(String segmentId) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         getTableBlockIndexUniqueIdentifiers(segmentId);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -140,17 +136,18 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
-    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
+    String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
       if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }
     }
-    throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
+    throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
   }
 
 
+
   @Override
   public List<DataMapDistributable> toDistributable(String segmentId) {
     CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
@@ -179,7 +176,6 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   @Override
   public void clear(String segmentId) {
-    segmentPropertiesMap.remove(segmentId);
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
@@ -200,7 +196,8 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
     if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
@@ -217,7 +214,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
                 indexFile));
       }
     }
-    List<DataMap> dataMaps;
+    List<AbstractCoarseGrainDataMap> dataMaps;
     try {
       dataMaps = cache.getAll(identifiers);
     } catch (IOException e) {
@@ -233,23 +230,20 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
-    SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId);
-    if (segmentProperties == null) {
-      int[] columnCardinality;
-      List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-          getTableBlockIndexUniqueIdentifiers(segmentId);
-      DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-      List<DataFileFooter> indexInfo =
-          fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath());
-      for (DataFileFooter fileFooter : indexInfo) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        if (segmentProperties == null) {
-          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
-          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        }
-      }
-      segmentPropertiesMap.put(segmentId, segmentProperties);
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    assert (dataMaps.size() > 0);
+    AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
+    assert (coarseGrainDataMap instanceof BlockletDataMap);
+    BlockletDataMap dataMap = (BlockletDataMap) coarseGrainDataMap;
+    return dataMap.getSegmentProperties();
+  }
+
+  @Override public List<Blocklet> getAllBlocklets(String segmentId) throws IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    for (AbstractCoarseGrainDataMap dataMap : dataMaps) {
+      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), null));
     }
-    return segmentProperties;
+    return blocklets;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
index 42505ad..516c460 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -30,7 +31,9 @@ import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
  * wrapper for blocklet data map data
@@ -45,6 +48,8 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
 
   private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
 
+  private BlockletSerializer blockletSerializer;
+
   public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
       int[] dimensionLens) {
     this.blockInfos = blockInfos;
@@ -69,6 +74,7 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
     }
     this.index = index;
     this.dimensionLens = dimensionLens;
+    this.blockletSerializer = new BlockletSerializer();
   }
 
   @Override public DataRefNode getNextDataRefNode() {
@@ -144,15 +150,33 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
     this.deleteDeltaDataCache = deleteDeltaDataCache;
   }
 
-  @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+  @Override
+  public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
     return deleteDeltaDataCache;
   }
 
-  @Override public int numberOfPages() {
+  @Override
+  public int numberOfPages() {
     return blockInfos.get(index).getDetailInfo().getPagesCount();
   }
 
-  @Override public int getPageRowCount(int pageNumber) {
+  @Override
+  public BitSetGroup getIndexedData() {
+    String dataMapWriterPath = blockInfos.get(index).getDataMapWriterPath();
+    if (dataMapWriterPath != null) {
+      try {
+        FineGrainBlocklet blocklet = blockletSerializer.deserializeBlocklet(dataMapWriterPath);
+        return blocklet.getBitSetGroup(numberOfPages());
+      } catch (IOException e) {
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getPageRowCount(int pageNumber) {
     return blockInfos.get(index).getDetailInfo().getBlockletInfo()
         .getNumberOfRowsPerPage()[pageNumber];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index b8cffc6..2720700 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -22,6 +22,8 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -31,7 +33,10 @@ import org.apache.carbondata.core.util.CarbonUtil;
  */
 public class IndexWrapper extends AbstractIndex {
 
+  private List<TableBlockInfo> blockInfos;
+
   public IndexWrapper(List<TableBlockInfo> blockInfos) {
+    this.blockInfos = blockInfos;
     DataFileFooter fileFooter = null;
     try {
       fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
@@ -46,4 +51,17 @@ public class IndexWrapper extends AbstractIndex {
 
   @Override public void buildIndex(List<DataFileFooter> footerList) {
   }
+
+  @Override public void clear() {
+    super.clear();
+    if (blockInfos != null) {
+      for (TableBlockInfo blockInfo : blockInfos) {
+        String dataMapWriterPath = blockInfo.getDataMapWriterPath();
+        if (dataMapWriterPath != null) {
+          CarbonFile file = FileFactory.getCarbonFile(dataMapWriterPath);
+          FileFactory.deleteAllCarbonFilesOfDir(file);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
deleted file mode 100644
index 9d77010..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.schema;
-
-/**
- * Types of filters of select query
- */
-public enum FilterType {
-  EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index f680579..886b12b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -313,6 +313,9 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnDataChunk dimensionColumnDataChunk,
       BitSetGroup prvBitSetGroup, int pageNumber, int numberOfRows) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet();
     bitSet.or(prvPageBitSet);
     byte[][] filterKeys = dimColumnExecuterInfo.getExcludeFilterKeys();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index fe1421c..0385c73 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -332,6 +332,9 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
   private BitSet getFilteredIndexesUisngPrvBitset(DimensionColumnDataChunk dimensionColumnDataChunk,
       BitSetGroup prvBitSetGroup, int pageNumber, int numberOfRows) {
     BitSet prvPageBitSet = prvBitSetGroup.getBitSet(pageNumber);
+    if (prvPageBitSet == null || prvPageBitSet.isEmpty()) {
+      return prvPageBitSet;
+    }
     BitSet bitSet = new BitSet(numberOfRows);
     byte[][] filterKeys = dimColumnExecuterInfo.getFilterKeys();
     int compareResult = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
index 60090d0..0ed9137 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
@@ -50,11 +50,6 @@ public class BlocksChunkHolder {
 
   private BitSetGroup bitSetGroup;
 
-  public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock) {
-    dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
-    measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
-  }
-
   public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock,
       FileHolder fileReader) {
     dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index abfc5f4..6e8076e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -153,6 +153,8 @@ public class FilterScanner extends AbstractBlockletScanner {
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
         totalBlockletStatistic.getCount() + 1);
+    // set the indexed data if it has any during fgdatamap pruning.
+    blocksChunkHolder.setBitSetGroup(blocksChunkHolder.getDataBlock().getIndexedData());
     // apply filter on actual data
     BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder, useBitSetPipeLine);
     // if indexes is empty then return with empty result

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index bd01772..68efdb8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -53,6 +54,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -96,6 +98,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -128,6 +131,13 @@ public final class CarbonUtil {
 
   private static final Configuration conf = new Configuration(true);
 
+  /**
+   * dfs.bytes-per-checksum
+   * HDFS checksum length, block size for a file should be exactly divisible
+   * by this value
+   */
+  private static final int HDFS_CHECKSUM_LENGTH = 512;
+
   private CarbonUtil() {
 
   }
@@ -2314,5 +2324,92 @@ public final class CarbonUtil {
     return false;
   }
 
+
+  /**
+   * This method will copy the given file to carbon store location
+   *
+   * @param localFilePath local file name with full path
+   * @throws CarbonDataWriterException
+   */
+  public static void copyCarbonDataFileToCarbonStorePath(String localFilePath,
+      String carbonDataDirectoryPath, long fileSizeInBytes)
+      throws CarbonDataWriterException {
+    long copyStartTime = System.currentTimeMillis();
+    LOGGER.info("Copying " + localFilePath + " --> " + carbonDataDirectoryPath);
+    try {
+      CarbonFile localCarbonFile =
+          FileFactory.getCarbonFile(localFilePath, FileFactory.getFileType(localFilePath));
+      String carbonFilePath = carbonDataDirectoryPath + localFilePath
+          .substring(localFilePath.lastIndexOf(File.separator));
+      copyLocalFileToCarbonStore(carbonFilePath, localFilePath,
+          CarbonCommonConstants.BYTEBUFFER_SIZE,
+          getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize()));
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while copying file from local store to carbon store", e);
+    }
+    LOGGER.info(
+        "Total copy time (ms) to copy file " + localFilePath + " is " + (System.currentTimeMillis()
+            - copyStartTime));
+  }
+
+  /**
+   * This method will read the local carbon data file and write to carbon data file in HDFS
+   *
+   * @param carbonStoreFilePath
+   * @param localFilePath
+   * @param bufferSize
+   * @param blockSize
+   * @throws IOException
+   */
+  private static void copyLocalFileToCarbonStore(String carbonStoreFilePath, String localFilePath,
+      int bufferSize, long blockSize) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + " is " + blockSize
+            + " (bytes");
+      }
+      dataOutputStream = FileFactory
+          .getDataOutputStream(carbonStoreFilePath, FileFactory.getFileType(carbonStoreFilePath),
+              bufferSize, blockSize);
+      dataInputStream = FileFactory
+          .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
+    } finally {
+      CarbonUtil.closeStream(dataInputStream);
+      CarbonUtil.closeStream(dataOutputStream);
+    }
+  }
+
+  /**
+   * This method will return max of block size and file size
+   *
+   * @param blockSize
+   * @param fileSize
+   * @return
+   */
+  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
+    long maxSize = blockSize;
+    if (fileSize > blockSize) {
+      maxSize = fileSize;
+    }
+    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
+    // per checksum, dfs.bytes-per-checksum=512 must divide block size
+    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
+    if (remainder > 0) {
+      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
+    }
+    // convert to make block size more readable.
+    String readableBlockSize = ByteUtil.convertByteToReadable(blockSize);
+    String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
+    String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
+    LOGGER.info(
+        "The configured block size is " + readableBlockSize + ", the actual carbon file size is "
+            + readableFileSize + ", choose the max value " + readableMaxSize
+            + " as the block size on HDFS");
+    return maxSize;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
index 2ad6327..8002e57 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
@@ -28,7 +28,8 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -36,7 +37,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -47,7 +50,7 @@ import com.google.gson.Gson;
 /**
  * Datamap implementation for min max blocklet.
  */
-public class MinMaxDataMap implements DataMap {
+public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
@@ -58,8 +61,9 @@ public class MinMaxDataMap implements DataMap {
 
   private MinMaxIndexBlockDetails[] readMinMaxDataMap;
 
-  @Override public void init(String filePath) throws MemoryException, IOException {
-    this.filePath = filePath;
+  @Override
+  public void init(DataMapModel model) throws MemoryException, IOException {
+    this.filePath = model.getFilePath();
     CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
     for (int i = 0; i < listFiles.length; i++) {
       readMinMaxDataMap = readJson(listFiles[i].getPath());
@@ -76,7 +80,7 @@ public class MinMaxDataMap implements DataMap {
     });
   }
 
-  public MinMaxIndexBlockDetails[] readJson(String filePath) throws IOException {
+  private MinMaxIndexBlockDetails[] readJson(String filePath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
@@ -90,8 +94,7 @@ public class MinMaxDataMap implements DataMap {
         return null;
       }
       dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      inStream = new InputStreamReader(dataInputStream, "UTF-8");
       buffReader = new BufferedReader(inStream);
       readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class);
     } catch (IOException e) {
@@ -109,14 +112,14 @@ public class MinMaxDataMap implements DataMap {
    * @param segmentProperties
    * @return
    */
-  @Override public List<Blocklet> prune(FilterResolverIntf filterExp,
-      SegmentProperties segmentProperties) {
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp,
+      SegmentProperties segmentProperties, List<String> partitions) {
     List<Blocklet> blocklets = new ArrayList<>();
 
     if (filterExp == null) {
       for (int i = 0; i < readMinMaxDataMap.length; i++) {
-        blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(),
-            String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+        blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId())));
       }
     } else {
       FilterExecuter filterExecuter =
@@ -126,7 +129,7 @@ public class MinMaxDataMap implements DataMap {
         BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
             readMinMaxDataMap[startIndex].getMinValues());
         if (!bitSet.isEmpty()) {
-          blocklets.add(new Blocklet(readMinMaxDataMap[startIndex].getFilePath(),
+          blocklets.add(new Blocklet(filePath,
               String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
         }
         startIndex++;
@@ -136,6 +139,11 @@ public class MinMaxDataMap implements DataMap {
   }
 
   @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void clear() {
     readMinMaxDataMap = null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9e85a02/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index b196d0d..5203cb3 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -25,49 +25,51 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapWriter;
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.events.Event;
 
 /**
  * Min Max DataMap Factory
  */
-public class MinMaxDataMapFactory implements DataMapFactory {
+public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
-  @Override
-  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
     this.identifier = identifier;
   }
 
   /**
    * createWriter will return the MinMaxDataWriter.
+   *
    * @param segmentId
    * @return
    */
-  @Override
-  public DataMapWriter createWriter(String segmentId) {
-    return new MinMaxDataWriter();
+  @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) {
+    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
   }
 
   /**
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+   *
    * @param segmentId
    * @return
    * @throws IOException
    */
-  @Override public List<DataMap> getDataMaps(String segmentId) throws IOException {
-    List<DataMap> dataMapList = new ArrayList<>();
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId)
+      throws IOException {
+    List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxDataMap.
     MinMaxDataMap dataMap = new MinMaxDataMap();
     try {
-      dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator);
+      dataMap.init(new DataMapModel(
+          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator));
     } catch (MemoryException ex) {
 
     }
@@ -76,7 +78,6 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   }
 
   /**
-   *
    * @param segmentId
    * @return
    */
@@ -86,6 +87,7 @@ public class MinMaxDataMapFactory implements DataMapFactory {
 
   /**
    * Clear the DataMap.
+   *
    * @param segmentId
    */
   @Override public void clear(String segmentId) {
@@ -94,21 +96,20 @@ public class MinMaxDataMapFactory implements DataMapFactory {
   /**
    * Clearing the data map.
    */
-  @Override
-  public void clear() {
+  @Override public void clear() {
   }
 
-  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
     return null;
   }
 
-  @Override
-  public void fireEvent(ChangeEvent event) {
+  @Override public void fireEvent(Event event) {
 
   }
 
-  @Override
-  public DataMapMeta getMeta() {
-    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), FilterType.EQUALTO);
+  @Override public DataMapMeta getMeta() {
+    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
+        new ArrayList<ExpressionType>());
   }
 }
\ No newline at end of file