You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:06 UTC

[6/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation for Blocklet

[CARBONDATA-1232] Datamap implementation for Blocklet

This closes #1099


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6812449
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6812449
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6812449

Branch: refs/heads/master
Commit: b6812449a4040dc5d3454cd0d6dd38f07be2854c
Parents: 2dbfab6
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jun 17 22:53:57 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/cache/CacheProvider.java    |   3 +
 .../apache/carbondata/core/cache/CacheType.java |   6 +
 .../core/datastore/block/TableBlockInfo.java    |  19 +
 .../core/datastore/block/TaskBlockInfo.java     |   4 +
 .../carbondata/core/indexstore/Blocklet.java    |  55 +-
 .../indexstore/BlockletDataMapIndexStore.java   | 180 ++++++
 .../core/indexstore/BlockletDetailInfo.java     | 117 ++++
 .../carbondata/core/indexstore/DataMap.java     |   8 +-
 .../core/indexstore/DataMapFactory.java         |  87 +++
 .../core/indexstore/DataMapStoreManager.java    |  90 ++-
 .../carbondata/core/indexstore/DataMapType.java |  14 +-
 .../TableBlockIndexUniqueIdentifier.java        | 103 ++++
 .../core/indexstore/TableDataMap.java           |  97 +++-
 .../core/indexstore/UnsafeMemoryDMStore.java    | 207 +++++++
 .../blockletindex/BlockletDMComparator.java     | 134 +++++
 .../blockletindex/BlockletDataMap.java          | 445 +++++++++++++++
 .../blockletindex/BlockletDataMapFactory.java   | 115 ++++
 .../BlockletDataRefNodeWrapper.java             | 137 +++++
 .../indexstore/blockletindex/IndexWrapper.java  |  49 ++
 .../core/indexstore/row/DataMapRow.java         |  89 +++
 .../core/indexstore/row/DataMapRowImpl.java     | 106 ++++
 .../core/indexstore/row/UnsafeDataMapRow.java   | 133 +++++
 .../core/indexstore/schema/DataMapSchema.java   | 124 ++++
 .../core/indexstore/schema/FilterType.java      |  24 +
 .../core/metadata/blocklet/BlockletInfo.java    |  53 +-
 .../core/metadata/index/BlockIndexInfo.java     |  27 +
 .../executor/impl/AbstractQueryExecutor.java    |  52 +-
 .../executer/IncludeFilterExecuterImpl.java     |   2 +-
 .../executer/RangeValueFilterExecuterImpl.java  |   2 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |   2 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |   2 +-
 ...velRangeLessThanEqualFilterExecuterImpl.java |   2 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |   2 +-
 .../processor/AbstractDataBlockIterator.java    |   3 +
 .../AbstractDetailQueryResultIterator.java      |  34 +-
 .../util/AbstractDataFileFooterConverter.java   |  53 ++
 .../apache/carbondata/core/util/CarbonUtil.java |  40 +-
 .../core/util/DataFileFooterConverter.java      |   4 +
 .../core/util/DataFileFooterConverter2.java     |   3 +
 .../core/util/DataFileFooterConverterV3.java    |  11 +
 format/src/main/thrift/carbondata_index.thrift  |   1 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  14 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  39 +-
 .../hadoop/api/CarbonTableInputFormat.java      | 562 ++++++++++++++++---
 .../hadoop/util/CarbonInputFormatUtil.java      |   7 +-
 .../presto/impl/CarbonTableReader.java          |  56 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   5 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   9 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  22 +-
 .../carbondata/spark/util/QueryPlanUtil.scala   |  10 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  14 +-
 .../sql/execution/command/IUDCommands.scala     |   7 -
 .../carbondata/spark/util/QueryPlanUtil.scala   |  10 +-
 .../processing/merger/CarbonCompactionUtil.java |  32 ++
 54 files changed, 3167 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 25a8976..5c4b265 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.BlockIndexStore;
 import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
@@ -126,6 +127,8 @@ public class CacheProvider {
     } else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
       cacheObject =
           new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
+    } else if (cacheType.equals(cacheType.DRIVER_BLOCKLET_DATAMAP)) {
+      cacheObject = new BlockletDataMapIndexStore(carbonStorePath, carbonLRUCache);
     }
     cacheTypeToCacheMap.put(cacheType, cacheObject);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index 2d6570d..ab51ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -56,6 +56,12 @@ public class CacheType<K, V> {
       DRIVER_BTREE = new CacheType("driver_btree");
 
   /**
+   * Executor BTree cache which maintains size of BTree metadata
+   */
+  public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+      DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap");
+
+  /**
    * cacheName which is unique name for a cache
    */
   private String cacheName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 1da6699..316e202 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -78,6 +79,8 @@ public class TableBlockInfo implements Distributable, Serializable {
    */
   private String[] deletedDeltaFilePath;
 
+  private BlockletDetailInfo detailInfo;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -89,6 +92,10 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.deletedDeltaFilePath = deletedDeltaFilePath;
   }
 
+  public TableBlockInfo() {
+
+  }
+
   /**
    * constructor to initialize the TbaleBlockInfo with BlockletInfos
    *
@@ -322,4 +329,16 @@ public class TableBlockInfo implements Distributable, Serializable {
   public String[] getDeletedDeltaFilePath() {
     return deletedDeltaFilePath;
   }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
index eb707c2..4fcec87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.block;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +46,9 @@ public class TaskBlockInfo {
     return taskBlockInfoMapping.keySet();
   }
 
+  public Collection<List<TableBlockInfo>> getAllTableBlockInfoList() {
+    return taskBlockInfoMapping.values();
+  }
 
   /**
    * returns TableBlockInfoList of given task

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 597c46c..66da4d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,27 +16,76 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
 /**
  * Blocklet
  */
 public class Blocklet implements Serializable {
 
-  private String path;
+  private Path path;
+
+  private String segmentId;
 
   private String blockletId;
 
+  private BlockletDetailInfo detailInfo;
+
+  private long length;
+
+  private String[] location;
+
   public Blocklet(String path, String blockletId) {
-    this.path = path;
+    this.path = new Path(path);
     this.blockletId = blockletId;
   }
 
-  public String getPath() {
+  public Path getPath() {
     return path;
   }
 
   public String getBlockletId() {
     return blockletId;
   }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
+
+  public void updateLocations() throws IOException {
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+    LocatedFileStatus fileStatus = iter.next();
+    location = fileStatus.getBlockLocations()[0].getHosts();
+    length = fileStatus.getLen();
+  }
+
+  public String[] getLocations() throws IOException {
+    return location;
+  }
+
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
new file mode 100644
index 0000000..fc8c273
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class BlockletDataMapIndexStore
+    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
+  /**
+   * carbon store path
+   */
+  protected String carbonStorePath;
+  /**
+   * CarbonLRU cache
+   */
+  protected CarbonLRUCache lruCache;
+
+  /**
+   * map of block info to lock object map, while loading the btree this will be filled
+   * and removed after loading the tree for that particular block info, this will be useful
+   * while loading the tree concurrently so only block level lock will be applied another
+   * block can be loaded concurrently
+   */
+  private Map<String, Object> segmentLockMap;
+
+  /**
+   * constructor to initialize the SegmentTaskIndexStore
+   *
+   * @param carbonStorePath
+   * @param lruCache
+   */
+  public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+    this.carbonStorePath = carbonStorePath;
+    this.lruCache = lruCache;
+    segmentLockMap = new ConcurrentHashMap<String, Object>();
+  }
+
+  @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+      throws IOException {
+    String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
+    if (dataMap == null) {
+      try {
+        dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
+      } catch (IndexBuilderException e) {
+        throw new IOException(e.getMessage(), e);
+      } catch (Throwable e) {
+        throw new IOException("Problem in loading segment block.", e);
+      }
+    }
+    return dataMap;
+  }
+
+  @Override public List<BlockletDataMap> getAll(
+      List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
+    List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+    try {
+      for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
+        blockletDataMaps.add(get(identifier));
+      }
+    } catch (Throwable e) {
+      for (BlockletDataMap dataMap : blockletDataMaps) {
+        dataMap.clear();
+      }
+      throw new IOException("Problem in loading segment blocks.", e);
+    }
+    return blockletDataMaps;
+  }
+
+  /**
+   * returns the SegmentTaskIndexWrapper
+   *
+   * @param tableSegmentUniqueIdentifier
+   * @return
+   */
+  @Override public BlockletDataMap getIfPresent(
+      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+    BlockletDataMap dataMap = (BlockletDataMap) lruCache
+        .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+    return dataMap;
+  }
+
+  /**
+   * method invalidate the segment cache for segment
+   *
+   * @param tableSegmentUniqueIdentifier
+   */
+  @Override public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+    lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+  }
+
+  /**
+   * Below method will be used to load the segment of segments
+   * One segment may have multiple task , so  table segment will be loaded
+   * based on task id and will return the map of taksId to table segment
+   * map
+   *
+   * @return map of taks id to segment mapping
+   * @throws IOException
+   */
+  private BlockletDataMap loadAndGetDataMap(
+      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+    String uniqueTableSegmentIdentifier =
+        tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+    if (lock == null) {
+      lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+    }
+    BlockletDataMap dataMap = null;
+    synchronized (lock) {
+      dataMap = new BlockletDataMap();
+      dataMap.init(tableSegmentUniqueIdentifier.getFilePath());
+      lruCache.put(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(), dataMap,
+          dataMap.getMemorySize());
+    }
+    return dataMap;
+  }
+
+  /**
+   * Below method will be used to get the segment level lock object
+   *
+   * @param uniqueIdentifier
+   * @return lock object
+   */
+  private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
+    // get the segment lock object if it is present then return
+    // otherwise add the new lock and return
+    Object segmentLoderLockObject = segmentLockMap.get(uniqueIdentifier);
+    if (null == segmentLoderLockObject) {
+      segmentLoderLockObject = new Object();
+      segmentLockMap.put(uniqueIdentifier, segmentLoderLockObject);
+    }
+    return segmentLoderLockObject;
+  }
+
+  /**
+   * The method clears the access count of table segments
+   *
+   * @param tableSegmentUniqueIdentifiers
+   */
+  @Override public void clearAccessCount(
+      List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
+    for (TableBlockIndexUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+      BlockletDataMap cacheable =
+          (BlockletDataMap) lruCache.get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+      cacheable.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
new file mode 100644
index 0000000..68dedd8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Blocklet detail information to be sent to each executor
+ */
+public class BlockletDetailInfo implements Serializable, Writable {
+
+  private int rowCount;
+
+  private short pagesCount;
+
+  private short versionNumber;
+
+  private int[] dimLens;
+
+  private long schemaUpdatedTimeStamp;
+
+  private BlockletInfo blockletInfo;
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public void setRowCount(int rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public int getPagesCount() {
+    return pagesCount;
+  }
+
+  public void setPagesCount(short pagesCount) {
+    this.pagesCount = pagesCount;
+  }
+
+  public short getVersionNumber() {
+    return versionNumber;
+  }
+
+  public void setVersionNumber(short versionNumber) {
+    this.versionNumber = versionNumber;
+  }
+
+  public BlockletInfo getBlockletInfo() {
+    return blockletInfo;
+  }
+
+  public void setBlockletInfo(BlockletInfo blockletInfo) {
+    this.blockletInfo = blockletInfo;
+  }
+
+  public int[] getDimLens() {
+    return dimLens;
+  }
+
+  public void setDimLens(int[] dimLens) {
+    this.dimLens = dimLens;
+  }
+
+  public long getSchemaUpdatedTimeStamp() {
+    return schemaUpdatedTimeStamp;
+  }
+
+  public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+    this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeInt(rowCount);
+    out.writeShort(pagesCount);
+    out.writeShort(versionNumber);
+    out.writeShort(dimLens.length);
+    for (int i = 0; i < dimLens.length; i++) {
+      out.writeInt(dimLens[i]);
+    }
+    out.writeLong(schemaUpdatedTimeStamp);
+    blockletInfo.write(out);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    rowCount = in.readInt();
+    pagesCount = in.readShort();
+    versionNumber = in.readShort();
+    dimLens = new int[in.readShort()];
+    for (int i = 0; i < dimLens.length; i++) {
+      dimLens[i] = in.readInt();
+    }
+    schemaUpdatedTimeStamp = in.readLong();
+    blockletInfo = new BlockletInfo();
+    blockletInfo.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
index 2651f15..1276494 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
- * Interface for adding and retrieving index data.
+ * Datamap is an entity which can store and retrieve index data.
  */
 public interface DataMap {
 
@@ -47,6 +47,12 @@ public interface DataMap {
   List<Blocklet> prune(FilterResolverIntf filterExp);
 
   /**
+   * Convert datamap to distributable object
+   * @return
+   */
+  DataMapDistributable toDistributable();
+
+  /**
    * Clear complete index table and release memory.
    */
   void clear();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
new file mode 100644
index 0000000..72f714f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+  /**
+   * Initialization of Datamap factory
+   * @param identifier
+   * @param dataMapName
+   */
+  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+  /**
+   * Get the datamap writer for each segmentid.
+   *
+   * @param identifier
+   * @param segmentId
+   * @return
+   */
+  DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
+      String segmentId);
+
+  /**
+   * Get the datamap for segmentid
+   *
+   * @param segmentId
+   * @return
+   */
+  List<DataMap> getDataMaps(String segmentId);
+
+  /**
+   * Get datamap for distributable object.
+   *
+   * @param distributable
+   * @return
+   */
+  DataMap getDataMap(DataMapDistributable distributable);
+
+  /**
+   * This method checks whether the columns and the type of filters supported
+   * for this datamap or not
+   *
+   * @param filterType
+   * @return
+   */
+  boolean isFiltersSupported(FilterType filterType);
+
+  /**
+   *
+   * @param event
+   */
+  void fireEvent(ChangeEvent event);
+
+  /**
+   * Clears datamap of the segment
+   */
+  void clear(String segmentId);
+
+  /**
+   * Clear all datamaps from memory
+   */
+  void clear();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
index 64c6e20..1a36187 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
@@ -16,7 +16,9 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -24,13 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
 /**
- * It maintains all the index tables in it.
+ * It maintains all the DataMaps in it.
  */
-public class DataMapStoreManager {
+public final class DataMapStoreManager {
 
   private static DataMapStoreManager instance = new DataMapStoreManager();
 
-  private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>();
+  /**
+   * Contains the list of datamaps for each table.
+   */
+  private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
@@ -48,56 +53,85 @@ public class DataMapStoreManager {
    */
   public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
       DataMapType mapType) {
-    Map<String, TableDataMap> map = dataMapMappping.get(mapType);
-    TableDataMap dataMap = null;
-    if (map == null) {
+    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+    TableDataMap dataMap;
+    if (tableDataMaps == null) {
+      createTableDataMap(identifier, mapType, dataMapName);
+      tableDataMaps = dataMapMappping.get(identifier);
+    }
+    dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    if (dataMap == null) {
       throw new RuntimeException("Datamap does not exist");
-    } else {
-      dataMap = map.get(dataMapName);
-      if (dataMap == null) {
-        throw new RuntimeException("Datamap does not exist");
-      }
     }
-    // Initialize datamap
-    dataMap.init(identifier, dataMapName);
     return dataMap;
   }
 
   /**
-   * Create new datamap instance using datamap type and path
+   * Create new datamap instance using datamap name, datamap type and table identifier
    *
    * @param mapType
    * @return
    */
-  public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType,
-      String dataMapName) {
-    Map<String, TableDataMap> map = dataMapMappping.get(mapType);
-    if (map == null) {
-      map = new HashMap<>();
-      dataMapMappping.put(mapType, map);
+  private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
+      DataMapType mapType, String dataMapName) {
+    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+    if (tableDataMaps == null) {
+      tableDataMaps = new ArrayList<>();
+      dataMapMappping.put(identifier, tableDataMaps);
     }
-    TableDataMap dataMap = map.get(dataMapName);
+    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
     if (dataMap != null) {
       throw new RuntimeException("Already datamap exists in that path with type " + mapType);
     }
 
     try {
-      //TODO create datamap using @mapType.getClassName())
+      DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
+      dataMapFactory.init(identifier, dataMapName);
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
     } catch (Exception e) {
       LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+    tableDataMaps.add(dataMap);
+    return dataMap;
+  }
+
+  private TableDataMap getAbstractTableDataMap(String dataMapName,
+      List<TableDataMap> tableDataMaps) {
+    TableDataMap dataMap = null;
+    for (TableDataMap tableDataMap: tableDataMaps) {
+      if (tableDataMap.getDataMapName().equals(dataMapName)) {
+        dataMap = tableDataMap;
+        break;
+      }
     }
-    // TODO: Initialize a data map by calling init method on the data map
-    map.put(dataMapName, dataMap);
     return dataMap;
   }
 
-  public void clearDataMap(String dataMapName, DataMapType mapType) {
-    Map<String, TableDataMap> map = dataMapMappping.get(mapType);
-    if (map != null && map.get(dataMapName) != null) {
-      map.remove(dataMapName).clear();
+  /**
+   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+   * @param identifier
+   * @param dataMapName
+   */
+  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+    if (tableDataMaps != null) {
+      int i = 0;
+      for (TableDataMap tableDataMap: tableDataMaps) {
+        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+          tableDataMap.clear(new ArrayList<String>());
+          tableDataMaps.remove(i);
+          break;
+        }
+        i++;
+      }
     }
   }
 
+  /**
+   * Returns the singleton instance
+   * @return
+   */
   public static DataMapStoreManager getInstance() {
     return instance;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
index b6a0f5b..0059b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
@@ -16,19 +16,21 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+
 /**
  * Datamap type
  */
 public enum DataMapType {
-  BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap");
+  BLOCKLET(BlockletDataMapFactory.class);
 
-  private String className;
+  private Class<? extends DataMapFactory> classObject;
 
-  DataMapType(String className) {
-    this.className = className;
+  DataMapType(Class<? extends DataMapFactory> classObject) {
+    this.classObject = classObject;
   }
 
-  public String getClassName() {
-    return className;
+  public Class<? extends DataMapFactory> getClassObject() {
+    return classObject;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
new file mode 100644
index 0000000..7e2bc0e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ */
+public class TableBlockIndexUniqueIdentifier {
+  /**
+   * table fully qualified identifier
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  private String segmentId;
+
+  private String carbonIndexFileName;
+
+  /**
+   * Constructor to initialize the class instance
+   *
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   */
+  public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String segmentId, String carbonIndexFileName) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.segmentId = segmentId;
+    this.carbonIndexFileName = carbonIndexFileName;
+  }
+
+  /**
+   * returns AbsoluteTableIdentifier
+   *
+   * @return
+   */
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * method returns the id to uniquely identify a key
+   *
+   * @return
+   */
+  public String getUniqueTableSegmentIdentifier() {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+        + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+        + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+        + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+  }
+
+  public String getFilePath() {
+    return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
+        + carbonIndexFileName;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
+
+    if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
+      return false;
+    }
+    if (!segmentId.equals(that.segmentId)) {
+      return false;
+    }
+    return carbonIndexFileName.equals(that.carbonIndexFileName);
+  }
+
+  @Override public int hashCode() {
+    int result = absoluteTableIdentifier.hashCode();
+    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + carbonIndexFileName.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
index e1532c8..39ca4c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
@@ -16,38 +16,34 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.events.ChangeEvent;
 import org.apache.carbondata.core.events.EventListener;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
 /**
  * DataMap at the table level, user can add any number of datamaps for one table. Depends
  * on the filter condition it can prune the blocklets.
  */
-public interface TableDataMap extends EventListener {
+public final class TableDataMap implements EventListener {
 
-  /**
-   * It is called to initialize and load the required table datamap metadata.
-   */
-  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+  private AbsoluteTableIdentifier identifier;
 
-  /**
-   * Gives the writer to write the metadata information of this datamap at table level.
-   *
-   * @return
-   */
-  DataMapWriter getWriter();
+  private String dataMapName;
+
+  private DataMapFactory dataMapFactory;
 
   /**
-   * Create the datamap using the segmentid  and name.
-   *
-   * @param identifier
-   * @param segmentId
-   * @return
+   * It is called to initialize and load the required table datamap metadata.
    */
-  DataMap createDataMap(AbsoluteTableIdentifier identifier, String segmentId);
+  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      DataMapFactory dataMapFactory) {
+    this.identifier = identifier;
+    this.dataMapName = dataMapName;
+    this.dataMapFactory = dataMapFactory;
+  }
 
   /**
    * Pass the valid segments and prune the datamap using filter expression
@@ -56,7 +52,24 @@ public interface TableDataMap extends EventListener {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp);
+  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
+    List<Blocklet> blocklets = new ArrayList<>();
+    for (String segmentId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      for (DataMap dataMap : dataMaps) {
+        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+      }
+    }
+    return blocklets;
+  }
+
+  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+    for (Blocklet blocklet : pruneBlocklets) {
+      blocklet.setSegmentId(segmentId);
+    }
+    return pruneBlocklets;
+  }
 
   /**
    * This is used for making the datamap distributable.
@@ -65,7 +78,16 @@ public interface TableDataMap extends EventListener {
    *
    * @return
    */
-  List<DataMapDistributable> toDistributable(List<String> segmentIds);
+  public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
+    List<DataMapDistributable> distributables = new ArrayList<>();
+    for (String segmentsId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+      for (DataMap dataMap : dataMaps) {
+        distributables.add(dataMap.toDistributable());
+      }
+    }
+    return distributables;
+  }
 
   /**
    * This method is used from any machine after it is distributed. It takes the distributable object
@@ -75,20 +97,37 @@ public interface TableDataMap extends EventListener {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp);
+  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+    return dataMapFactory.getDataMap(distributable).prune(filterExp);
+  }
+
+  @Override public void fireEvent(ChangeEvent event) {
+    dataMapFactory.fireEvent(event);
+  }
 
   /**
-   * This method checks whether the columns and the type of filters supported
-   * for this datamap or not
-   *
-   * @param filterExp
-   * @return
+   * Clear only the datamaps of the segments
+   * @param segmentIds
    */
-  boolean isFiltersSupported(FilterResolverIntf filterExp);
+  public void clear(List<String> segmentIds) {
+    for (String segmentId: segmentIds) {
+      dataMapFactory.clear(segmentId);
+    }
+  }
 
   /**
-   * Clears table level datamap
+   * Clears all datamap
+   */
+  public void clear() {
+    dataMapFactory.clear();
+  }
+  /**
+   * Get the unique name of datamap
+   *
+   * @return
    */
-  void clear();
+  public String getDataMapName() {
+    return dataMapName;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
new file mode 100644
index 0000000..8246f99
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryAllocator;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to unsafe.
+ */
+public class UnsafeMemoryDMStore {
+
+  private MemoryBlock memoryBlock;
+
+  private static int capacity = 8 * 1024 * 1024;
+
+  private int allocatedSize;
+
+  private int runningLength;
+
+  private MemoryAllocator memoryAllocator;
+
+  private boolean isMemoryFreed;
+
+  private DataMapSchema[] schema;
+
+  private int[] pointers;
+
+  private int rowCount;
+
+  public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+    this.schema = schema;
+    this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
+    this.allocatedSize = capacity;
+    this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+    this.pointers = new int[1000];
+  }
+
+  /**
+   * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to
+   * new one.
+   *
+   * @param rowSize
+   */
+  private void ensureSize(int rowSize) {
+    if (runningLength + rowSize >= allocatedSize) {
+      MemoryBlock allocate =
+          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+      unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+          allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+      memoryAllocator.free(memoryBlock);
+      allocatedSize = allocatedSize + capacity;
+      memoryBlock = allocate;
+    }
+    if (this.pointers.length <= rowCount + 1) {
+      int[] newPointer = new int[pointers.length + 1000];
+      System.arraycopy(pointers, 0, newPointer, 0, pointers.length);
+      this.pointers = newPointer;
+    }
+  }
+
+  /**
+   * Add the index row to unsafe.
+   *
+   * @param indexRow
+   * @return
+   */
+  public void addIndexRowToUnsafe(DataMapRow indexRow) {
+    // First calculate the required memory to keep the row in unsafe
+    int rowSize = indexRow.getTotalSizeInBytes();
+    // Check whether allocated memory is sufficient or not.
+    ensureSize(rowSize);
+    int pointer = runningLength;
+
+    for (int i = 0; i < schema.length; i++) {
+      addToUnsafe(schema[i], indexRow, i);
+    }
+    pointers[rowCount++] = pointer;
+  }
+
+  private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+    switch (schema.getSchemaType()) {
+      case FIXED:
+        switch (schema.getDataType()) {
+          case BYTE:
+            unsafe.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                row.getByte(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case SHORT:
+            unsafe
+                .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                    row.getShort(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case INT:
+            unsafe.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                row.getInt(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case LONG:
+            unsafe.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                row.getLong(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case FLOAT:
+            unsafe
+                .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                    row.getFloat(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case DOUBLE:
+            unsafe
+                .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                    row.getDouble(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case BYTE_ARRAY:
+            byte[] data = row.getByteArray(index);
+            unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+                memoryBlock.getBaseOffset() + runningLength, data.length);
+            runningLength += row.getSizeInBytes(index);
+            break;
+        }
+        break;
+      case VARIABLE:
+        byte[] data = row.getByteArray(index);
+        unsafe.putShort(memoryBlock.getBaseOffset() + runningLength, (short) data.length);
+        runningLength += 2;
+        unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+            memoryBlock.getBaseOffset() + runningLength, data.length);
+        runningLength += data.length;
+        break;
+      case STRUCT:
+        DataMapSchema[] childSchemas =
+            ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+        DataMapRow struct = row.getRow(index);
+        for (int i = 0; i < childSchemas.length; i++) {
+          addToUnsafe(childSchemas[i], struct, i);
+        }
+        break;
+    }
+  }
+
+  public DataMapRow getUnsafeRow(int index) {
+    assert (index < rowCount);
+    return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
+  }
+
+  public void finishWriting() {
+    if (runningLength < allocatedSize) {
+      MemoryBlock allocate =
+          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+      unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+          allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+      memoryAllocator.free(memoryBlock);
+      memoryBlock = allocate;
+    }
+    // Compact pointers.
+    if (rowCount < pointers.length) {
+      int[] newPointer = new int[rowCount];
+      System.arraycopy(pointers, 0, newPointer, 0, rowCount);
+      this.pointers = newPointer;
+    }
+  }
+
+  public void freeMemory() {
+    if (!isMemoryFreed) {
+      memoryAllocator.free(memoryBlock);
+      isMemoryFreed = true;
+    }
+  }
+
+  public int getMemoryUsed() {
+    return runningLength;
+  }
+
+  public DataMapSchema[] getSchema() {
+    return schema;
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
new file mode 100644
index 0000000..9a50600
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Data map comparator
+ */
+public class BlockletDMComparator implements Comparator<DataMapRow> {
+
+  /**
+   * no dictionary column value is of variable length so in each column value
+   * it will -1
+   */
+  private static final int NO_DCITIONARY_COLUMN_VALUE = -1;
+
+  /**
+   * sized of the short value in bytes
+   */
+  private static final short SHORT_SIZE_IN_BYTES = 2;
+
+  private int[] eachColumnValueSize;
+
+  /**
+   * the number of no dictionary columns in SORT_COLUMNS
+   */
+  private int numberOfNoDictSortColumns;
+
+  /**
+   * the number of columns in SORT_COLUMNS
+   */
+  private int numberOfSortColumns;
+
+  public BlockletDMComparator(int[] eachColumnValueSize, int numberOfSortColumns,
+      int numberOfNoDictSortColumns) {
+    this.eachColumnValueSize = eachColumnValueSize;
+    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  @Override public int compare(DataMapRow first, DataMapRow second) {
+    int dictionaryKeyOffset = 0;
+    int nonDictionaryKeyOffset = 0;
+    int compareResult = 0;
+    int processedNoDictionaryColumn = numberOfNoDictSortColumns;
+    byte[][] firstBytes = splitKey(first.getByteArray(0));
+    byte[][] secondBytes = splitKey(first.getByteArray(0));
+    byte[] firstNoDictionaryKeys = firstBytes[1];
+    ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys);
+    byte[] secondNoDictionaryKeys = secondBytes[1];
+    ByteBuffer secondNoDictionaryKeyBuffer = ByteBuffer.wrap(secondNoDictionaryKeys);
+    int actualOffset = 0;
+    int actualOffset1 = 0;
+    int firstNoDcitionaryLength = 0;
+    int secondNodeDictionaryLength = 0;
+
+    for (int i = 0; i < numberOfSortColumns; i++) {
+
+      if (eachColumnValueSize[i] != NO_DCITIONARY_COLUMN_VALUE) {
+        byte[] firstDictionaryKeys = firstBytes[0];
+        byte[] secondDictionaryKeys = secondBytes[0];
+        compareResult = ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(firstDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i],
+                secondDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i]);
+        dictionaryKeyOffset += eachColumnValueSize[i];
+      } else {
+        if (processedNoDictionaryColumn > 1) {
+          actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          firstNoDcitionaryLength =
+              firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+                  - actualOffset;
+          actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          secondNodeDictionaryLength =
+              secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+                  - actualOffset1;
+          compareResult = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+                  secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+          nonDictionaryKeyOffset += SHORT_SIZE_IN_BYTES;
+          processedNoDictionaryColumn--;
+        } else {
+          actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          firstNoDcitionaryLength = firstNoDictionaryKeys.length - actualOffset;
+          secondNodeDictionaryLength = secondNoDictionaryKeys.length - actualOffset1;
+          compareResult = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+                  secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+        }
+      }
+      if (compareResult != 0) {
+        return compareResult;
+      }
+    }
+
+    return 0;
+  }
+
+  /**
+   * Split the index key to dictionary and no dictionary.
+   * @param startKey
+   * @return
+   */
+  private byte[][] splitKey(byte[] startKey) {
+    ByteBuffer buffer = ByteBuffer.wrap(startKey);
+    buffer.rewind();
+    int dictonaryKeySize = buffer.getInt();
+    int nonDictonaryKeySize = buffer.getInt();
+    byte[] dictionaryKey = new byte[dictonaryKeySize];
+    buffer.get(dictionaryKey);
+    byte[] nonDictionaryKey = new byte[nonDictonaryKeySize];
+    buffer.get(nonDictionaryKey);
+    return new byte[][] {dictionaryKey, nonDictionaryKey};
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
new file mode 100644
index 0000000..79aa091
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletDataMap implements DataMap, Cacheable {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+
+  private static int KEY_INDEX = 0;
+
+  private static int MIN_VALUES_INDEX = 1;
+
+  private static int MAX_VALUES_INDEX = 2;
+
+  private static int ROW_COUNT_INDEX = 3;
+
+  private static int FILE_PATH_INDEX = 4;
+
+  private static int PAGE_COUNT_INDEX = 5;
+
+  private static int VERSION_INDEX = 6;
+
+  private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+  private static int BLOCK_INFO_INDEX = 8;
+
+  private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+  private SegmentProperties segmentProperties;
+
+  private int[] columnCardinality;
+
+  @Override public DataMapWriter getWriter() {
+    return null;
+  }
+
+  @Override public void init(String path) {
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    try {
+      List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+      for (DataFileFooter fileFooter : indexInfo) {
+        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+        if (segmentProperties == null) {
+          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+          createSchema(segmentProperties);
+        }
+        TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+        fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+        loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+      }
+      if (unsafeMemoryDMStore != null) {
+        unsafeMemoryDMStore.finishWriting();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
+      String filePath) {
+    int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+    List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+    DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+    for (int index = 0; index < blockletList.size(); index++) {
+      DataMapRow row = new DataMapRowImpl(schema);
+      int ordinal = 0;
+      BlockletInfo blockletInfo = blockletList.get(index);
+
+      // add start key as index key
+      row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
+
+      BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
+      ordinal++;
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
+      ordinal++;
+
+      row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+      // add file path
+      byte[] filePathBytes = filePath.getBytes();
+      row.setByteArray(filePathBytes, ordinal++);
+
+      // add pages
+      row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+      // add version number
+      row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+      // add schema updated time
+      row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+      // add blocklet info
+      byte[] serializedData;
+      try {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutput dataOutput = new DataOutputStream(stream);
+        blockletInfo.write(dataOutput);
+        serializedData = stream.toByteArray();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      row.setByteArray(serializedData, ordinal);
+      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+    }
+  }
+
+  private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
+    DataMapSchema[] minSchemas =
+        ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+    DataMapRow minRow = new DataMapRowImpl(minSchemas);
+    int minOrdinal = 0;
+    // min value adding
+    for (int i = 0; i < minMaxLen.length; i++) {
+      minRow.setByteArray(minValues[i], minOrdinal++);
+    }
+    return minRow;
+  }
+
+  private void createSchema(SegmentProperties segmentProperties) {
+    List<DataMapSchema> indexSchemas = new ArrayList<>();
+
+    // Index key
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+    int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+    // do it 2 times, one for min and one for max.
+    for (int k = 0; k < 2; k++) {
+      DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+      for (int i = 0; i < minMaxLen.length; i++) {
+        if (minMaxLen[i] <= 0) {
+          mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY);
+        } else {
+          mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]);
+        }
+      }
+      DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas);
+      indexSchemas.add(mapSchema);
+    }
+
+    // for number of rows.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT));
+
+    // for table block path
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+    // for number of pages.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+    // for version number.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+    // for schema updated time.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG));
+
+    //for blocklet info
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+    unsafeMemoryDMStore =
+        new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+  }
+
+  @Override public List<Blocklet> prune(FilterResolverIntf filterExp) {
+
+    // getting the start and end index key based on filter for hitting the
+    // selected block reference nodes based on filter resolver tree.
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("preparing the start and end key for finding"
+          + "start and end block as per filter resolver");
+    }
+    List<Blocklet> blocklets = new ArrayList<>();
+    Comparator<DataMapRow> comparator =
+        new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(),
+            segmentProperties.getNumberOfSortColumns(),
+            segmentProperties.getNumberOfNoDictSortColumns());
+    List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+    FilterUtil
+        .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys);
+    // reading the first value from list which has start key
+    IndexKey searchStartKey = listOfStartEndKeys.get(0);
+    // reading the last value from list which has end key
+    IndexKey searchEndKey = listOfStartEndKeys.get(1);
+    if (null == searchStartKey && null == searchEndKey) {
+      try {
+        // TODO need to handle for no dictionary dimensions
+        searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+        // TODO need to handle for no dictionary dimensions
+        searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+      } catch (KeyGenException e) {
+        return null;
+      }
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey
+              .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey
+              .getNoDictionaryKeys() + "Dictionary End Key: " + searchEndKey.getDictionaryKeys()
+              + "No Dictionary End Key " + searchEndKey.getNoDictionaryKeys());
+    }
+    if (filterExp == null) {
+      int rowCount = unsafeMemoryDMStore.getRowCount();
+      for (int i = 0; i < rowCount; i++) {
+        DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i);
+        blocklets.add(createBlocklet(unsafeRow, i));
+      }
+    } else {
+      int startIndex = findStartIndex(convertToRow(searchStartKey), comparator);
+      int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
+      FilterExecuter filterExecuter =
+          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+      while (startIndex <= endIndex) {
+        DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex);
+        BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX),
+            getMinMaxValue(unsafeRow, MIN_VALUES_INDEX));
+        if (!bitSet.isEmpty()) {
+          blocklets.add(createBlocklet(unsafeRow, startIndex));
+        }
+        startIndex++;
+      }
+    }
+
+    return blocklets;
+  }
+
+  private byte[][] getMinMaxValue(DataMapRow row, int index) {
+    DataMapRow minMaxRow = row.getRow(index);
+    byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+    for (int i = 0; i < minMax.length; i++) {
+      minMax[i] = minMaxRow.getByteArray(i);
+    }
+    return minMax;
+  }
+
+  private Blocklet createBlocklet(DataMapRow row, int blockletId) {
+    Blocklet blocklet =
+        new Blocklet(new String(row.getByteArray(FILE_PATH_INDEX)), blockletId + "");
+    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+    detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+    detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+    detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+    detailInfo.setDimLens(columnCardinality);
+    detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+    BlockletInfo blockletInfo = new BlockletInfo();
+    try {
+      byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+      ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+      DataInputStream inputStream = new DataInputStream(stream);
+      blockletInfo.readFields(inputStream);
+      inputStream.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    detailInfo.setBlockletInfo(blockletInfo);
+    blocklet.setDetailInfo(detailInfo);
+    return blocklet;
+  }
+
+  /**
+   * Binary search used to get the first tentative index row based on
+   * search key
+   *
+   * @param key search key
+   * @return first tentative block
+   */
+  private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+    int childNodeIndex;
+    int low = 0;
+    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int mid = 0;
+    int compareRes = -1;
+    //
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      // compare the entries
+      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      if (compareRes < 0) {
+        high = mid - 1;
+      } else if (compareRes > 0) {
+        low = mid + 1;
+      } else {
+        // if key is matched then get the first entry
+        int currentPos = mid;
+        while (currentPos - 1 >= 0
+            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+          currentPos--;
+        }
+        mid = currentPos;
+        break;
+      }
+    }
+    // if compare result is less than zero then we
+    // and mid is more than 0 then we need to previous block as duplicates
+    // record can be present
+    if (compareRes < 0) {
+      if (mid > 0) {
+        mid--;
+      }
+      childNodeIndex = mid;
+    } else {
+      childNodeIndex = mid;
+    }
+    // get the leaf child
+    return childNodeIndex;
+  }
+
+  /**
+   * Binary search used to get the last tentative block  based on
+   * search key
+   *
+   * @param key search key
+   * @return first tentative block
+   */
+  private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+    int childNodeIndex;
+    int low = 0;
+    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int mid = 0;
+    int compareRes = -1;
+    //
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      // compare the entries
+      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      if (compareRes < 0) {
+        high = mid - 1;
+      } else if (compareRes > 0) {
+        low = mid + 1;
+      } else {
+        int currentPos = mid;
+        // if key is matched then get the first entry
+        while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+          currentPos++;
+        }
+        mid = currentPos;
+        break;
+      }
+    }
+    // if compare result is less than zero then we
+    // and mid is more than 0 then we need to previous block as duplicates
+    // record can be present
+    if (compareRes < 0) {
+      if (mid > 0) {
+        mid--;
+      }
+      childNodeIndex = mid;
+    } else {
+      childNodeIndex = mid;
+    }
+    return childNodeIndex;
+  }
+
+  private DataMapRow convertToRow(IndexKey key) {
+    ByteBuffer buffer =
+        ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
+    buffer.putInt(key.getDictionaryKeys().length);
+    buffer.putInt(key.getNoDictionaryKeys().length);
+    buffer.put(key.getDictionaryKeys());
+    buffer.put(key.getNoDictionaryKeys());
+    DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+    dataMapRow.setByteArray(buffer.array(), 0);
+    return dataMapRow;
+  }
+
+  @Override public void clear() {
+    unsafeMemoryDMStore.freeMemory();
+    unsafeMemoryDMStore = null;
+    segmentProperties = null;
+  }
+
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  @Override public int getAccessCount() {
+    return 0;
+  }
+
+  @Override public long getMemorySize() {
+    return unsafeMemoryDMStore.getMemoryUsed();
+  }
+
+  @Override public DataMapDistributable toDistributable() {
+    // TODO
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
new file mode 100644
index 0000000..2fe6643
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapFactory;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletDataMapFactory implements DataMapFactory {
+
+  private AbsoluteTableIdentifier identifier;
+
+  private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+
+  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+    this.identifier = identifier;
+    cache = CacheProvider.getInstance()
+        .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
+  }
+
+  public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+    return null;
+  }
+
+  public List<DataMap> getDataMaps(String segmentId) {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        segmentMap.get(segmentId);
+    if (tableBlockIndexUniqueIdentifiers == null) {
+      tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+      String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
+      FileFactory.FileType fileType = FileFactory.getFileType(path);
+      CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(".carbonindex");
+        }
+      });
+      for (int i = 0; i < listFiles.length; i++) {
+        tableBlockIndexUniqueIdentifiers.add(
+            new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
+      }
+    }
+
+    try {
+      return cache.getAll(tableBlockIndexUniqueIdentifiers);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override public boolean isFiltersSupported(FilterType filterType) {
+    return true;
+  }
+
+  public void clear(String segmentId) {
+    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+    if (blockIndexes != null) {
+      for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+        DataMap dataMap = cache.getIfPresent(blockIndex);
+        dataMap.clear();
+        cache.invalidate(blockIndex);
+      }
+    }
+  }
+
+  @Override public void clear() {
+    for (String segmentId: segmentMap.keySet()) {
+      clear(segmentId);
+    }
+  }
+
+  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+    return null;
+  }
+
+  @Override public void fireEvent(ChangeEvent event) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
new file mode 100644
index 0000000..5509c75
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+/**
+ * wrapper for blocklet data map data
+ */
+public class BlockletDataRefNodeWrapper implements DataRefNode {
+
+  private List<TableBlockInfo> blockInfos;
+
+  private int index;
+
+  private int[] dimensionLens;
+
+  private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+  public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
+      int[] dimensionLens) {
+    this.blockInfos = blockInfos;
+    this.index = index;
+    this.dimensionLens = dimensionLens;
+  }
+
+  @Override public DataRefNode getNextDataRefNode() {
+    if (index + 1 < blockInfos.size()) {
+      new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens);
+    }
+    return null;
+  }
+
+  @Override public int nodeSize() {
+    return blockInfos.get(index).getDetailInfo().getRowCount();
+  }
+
+  @Override public long nodeNumber() {
+    return index;
+  }
+
+  @Override public byte[][] getColumnsMaxValue() {
+    return null;
+  }
+
+  @Override public byte[][] getColumnsMinValue() {
+    return null;
+  }
+
+  @Override
+  public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+    return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes);
+  }
+
+  @Override
+  public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
+      throws IOException {
+    DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+    return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes);
+  }
+
+  @Override
+  public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+    return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes);
+  }
+
+  @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException {
+    MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+    return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex);
+  }
+
+  private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException {
+    ColumnarFormatVersion version =
+        ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+    DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance()
+        .getDimensionColumnChunkReader(version,
+            blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens,
+            blockInfos.get(index).getFilePath());
+    return dimensionColumnChunkReader;
+  }
+
+  private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException {
+    ColumnarFormatVersion version =
+        ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+    return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version,
+        blockInfos.get(index).getDetailInfo().getBlockletInfo(),
+        blockInfos.get(index).getFilePath());
+  }
+
+  @Override
+  public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+    this.deleteDeltaDataCache = deleteDeltaDataCache;
+  }
+
+  @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+    return deleteDeltaDataCache;
+  }
+
+  @Override public int numberOfPages() {
+    return blockInfos.get(index).getDetailInfo().getPagesCount();
+  }
+
+  public int numberOfNodes() {
+    return blockInfos.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
new file mode 100644
index 0000000..b8cffc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Wrapper of abstract index
+ * TODO it could be removed after refactor
+ */
+public class IndexWrapper extends AbstractIndex {
+
+  public IndexWrapper(List<TableBlockInfo> blockInfos) {
+    DataFileFooter fileFooter = null;
+    try {
+      fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
+        fileFooter.getSegmentInfo().getColumnCardinality());
+    dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0,
+        segmentProperties.getDimensionColumnsValueSize());
+  }
+
+  @Override public void buildIndex(List<DataFileFooter> footerList) {
+  }
+}