You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/04/26 10:40:18 UTC

carbondata git commit: [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 bb35c255e -> 0b0d8e653


[CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

1. Cacheable object is changed from dataMap to wrapper
2. Blocklet info is stored as binary to dataMap

This closes #2187


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0b0d8e65
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0b0d8e65
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0b0d8e65

Branch: refs/heads/branch-1.3
Commit: 0b0d8e6536ff1d657e5201affd671c2e21c03ad9
Parents: bb35c25
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Apr 25 11:32:26 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Apr 26 16:12:52 2018 +0530

----------------------------------------------------------------------
 .../dictionary/AbstractDictionaryCache.java     |   4 +-
 .../carbondata/core/datamap/TableDataMap.java   |  10 +-
 .../core/datamap/dev/CacheableDataMap.java      |  10 +-
 .../core/datastore/BlockIndexStore.java         |   3 +-
 .../core/datastore/SegmentTaskIndexStore.java   |   4 +-
 .../indexstore/BlockletDataMapIndexStore.java   | 114 ++++++++++++-----
 .../indexstore/BlockletDataMapIndexWrapper.java |  52 ++++++++
 .../core/indexstore/BlockletDetailInfo.java     |  60 +++++++--
 .../core/indexstore/BlockletDetailsFetcher.java |  10 --
 .../core/indexstore/SafeMemoryDMStore.java      |  11 ++
 .../blockletindex/BlockletDataMap.java          |  73 +++++------
 .../BlockletDataMapDistributable.java           |  18 +--
 .../blockletindex/BlockletDataMapFactory.java   | 128 +++++++++----------
 .../blockletindex/SegmentIndexFileStore.java    |  21 ++-
 .../core/indexstore/schema/CarbonRowSchema.java |   4 +
 .../core/metadata/SegmentFileStore.java         |  29 +++++
 .../core/metadata/blocklet/BlockletInfo.java    |   8 +-
 .../core/metadata/blocklet/DataFileFooter.java  |   2 +-
 .../core/metadata/schema/table/TableInfo.java   |  23 ++++
 .../core/util/BlockletDataMapUtil.java          | 120 ++++++++++++-----
 .../TestBlockletDataMapFactory.java             |  27 ++--
 .../apache/carbondata/hadoop/CacheClient.java   |  26 ++--
 .../hadoop/api/AbstractDataMapJob.java          |   7 +-
 .../carbondata/hadoop/api/DataMapJob.java       |   5 +-
 24 files changed, 503 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 9ed9007..78feb4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,7 +59,9 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
     initThreadPoolSize();
   }
 
-  @Override public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+  @Override
+  public void put(DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier,
+      Dictionary value) {
     throw new UnsupportedOperationException("Operation not supported");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 020d6c9..23e7695 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -126,11 +126,11 @@ public final class TableDataMap extends OperationEventListener {
     for (DataMap dataMap : dataMaps) {
       blocklets.addAll(dataMap.prune(filterExp, partitions));
     }
-    for (Blocklet blocklet: blocklets) {
-      ExtendedBlocklet detailedBlocklet =
-          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
-      detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo());
-      detailedBlocklets.add(detailedBlocklet);
+    List<ExtendedBlocklet> detailedBlockletList =
+        blockletDetailsFetcher.getExtendedBlocklets(blocklets, distributable.getSegment());
+    for (ExtendedBlocklet blocklet : detailedBlockletList) {
+      blocklet.setSegmentId(distributable.getSegment().getSegmentNo());
+      detailedBlocklets.add(blocklet);
     }
     return detailedBlocklets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
index 885b21f..19ec94c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.memory.MemoryException;
 
 /**
@@ -29,11 +31,13 @@ import org.apache.carbondata.core.memory.MemoryException;
 public interface CacheableDataMap {
 
   /**
-   * Add the dataMap to cache
+   * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
    *
-   * @param dataMap
+   * @param tableBlockIndexUniqueIdentifier
+   * @param blockletDataMapIndexWrapper
    */
-  void cache(DataMap dataMap) throws IOException, MemoryException;
+  void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;
 
   /**
    * Get all the uncached distributables from the list.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
index 609bf1c..8055ffd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
@@ -229,7 +229,8 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
         .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
   }
 
-  @Override public void put(TableBlockUniqueIdentifier key, AbstractIndex value) {
+  @Override
+  public void put(TableBlockUniqueIdentifier tableBlockUniqueIdentifier, AbstractIndex value) {
     throw new UnsupportedOperationException("Operation not supported");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 744cc93..2d6e77c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -140,7 +140,9 @@ public class SegmentTaskIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
-  @Override public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) {
+  @Override
+  public void put(TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
+      SegmentTaskIndexWrapper value) {
     throw new UnsupportedOperationException("Operation not supported");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ce0fe8b..1f31bd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
@@ -41,7 +42,7 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil;
  * blocks
  */
 public class BlockletDataMapIndexStore
-    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
   /**
@@ -68,54 +69,90 @@ public class BlockletDataMapIndexStore
   }
 
   @Override
-  public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier)
+  public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
       throws IOException {
     String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
-    BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
-    if (dataMap == null) {
+    BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
+        (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
+    List<DataMap> dataMaps = new ArrayList<>();
+    if (blockletDataMapIndexWrapper == null) {
       try {
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
         Set<String> filesRead = new HashSet<>();
-        Map<String, BlockMetaInfo> blockMetaInfoMap =
-            BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
-        dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
-      } catch (MemoryException e) {
+        long memorySize = 0L;
+        String segmentFilePath = identifier.getIndexFilePath();
+        Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil
+            .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+        // if the identifier is not a merge file we can directly load the datamaps
+        if (identifier.getMergeIndexFileName() == null) {
+          Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+              .getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
+                  carbonDataFileBlockMetaInfoMapping);
+          BlockletDataMap blockletDataMap =
+              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
+          memorySize += blockletDataMap.getMemorySize();
+          dataMaps.add(blockletDataMap);
+          blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+        } else {
+          // if the identifier is a merge file then collect the index files and load the datamaps
+          List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+              BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
+          for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
+              tableBlockIndexUniqueIdentifiers) {
+            Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+                .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
+                    carbonDataFileBlockMetaInfoMapping);
+            BlockletDataMap blockletDataMap =
+                loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
+            memorySize += blockletDataMap.getMemorySize();
+            dataMaps.add(blockletDataMap);
+          }
+          blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+        }
+        lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
+            memorySize);
+      } catch (Throwable e) {
+        // clear all the memory used by datamaps loaded
+        for (DataMap dataMap : dataMaps) {
+          dataMap.clear();
+        }
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
       }
     }
-    return dataMap;
+    return blockletDataMapIndexWrapper;
   }
 
   @Override
-  public List<BlockletDataMap> getAll(
+  public List<BlockletDataMapIndexWrapper> getAll(
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
-    List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+    List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
+        new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
     ExecutorService service = null;
+    BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
     // Get the datamaps for each indexfile from cache.
     try {
       for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
-        BlockletDataMap ifPresent = getIfPresent(identifier);
-        if (ifPresent != null) {
-          blockletDataMaps.add(ifPresent);
+        BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
+        if (dataMapIndexWrapper != null) {
+          blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
         } else {
           missedIdentifiers.add(identifier);
         }
       }
       if (missedIdentifiers.size() > 0) {
-        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-        Set<String> filesRead = new HashSet<>();
-        for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
-          Map<String, BlockMetaInfo> blockMetaInfoMap =
-              BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
-          blockletDataMaps.add(
-              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
+        for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
+          blockletDataMapIndexWrapper = get(identifier);
+          blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
         }
       }
     } catch (Throwable e) {
-      for (BlockletDataMap dataMap : blockletDataMaps) {
-        dataMap.clear();
+      if (null != blockletDataMapIndexWrapper) {
+        List<DataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
+        for (DataMap dataMap : dataMaps) {
+          dataMap.clear();
+        }
       }
       throw new IOException("Problem in loading segment blocks.", e);
     } finally {
@@ -123,7 +160,7 @@ public class BlockletDataMapIndexStore
         service.shutdownNow();
       }
     }
-    return blockletDataMaps;
+    return blockletDataMapIndexWrappers;
   }
 
   /**
@@ -133,10 +170,10 @@ public class BlockletDataMapIndexStore
    * @return
    */
   @Override
-  public BlockletDataMap getIfPresent(
+  public BlockletDataMapIndexWrapper getIfPresent(
       TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
-    return (BlockletDataMap) lruCache.get(
-        tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+    return (BlockletDataMapIndexWrapper) lruCache
+        .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
   /**
@@ -149,14 +186,16 @@ public class BlockletDataMapIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
-  @Override public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
-      BlockletDataMap blockletDataMap) throws IOException, MemoryException {
+  @Override
+  public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
     Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
     if (lock == null) {
       lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
     }
+    long memorySize = 0L;
     // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
     // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
     // in the cache need to be overwritten then use the invalidate interface
@@ -164,13 +203,20 @@ public class BlockletDataMapIndexStore
     if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
       synchronized (lock) {
         if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+          List<DataMap> dataMaps = wrapper.getDataMaps();
           try {
-            blockletDataMap.convertToUnsafeDMStore();
-            lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(),
-                blockletDataMap, blockletDataMap.getMemorySize());
+            for (DataMap dataMap: dataMaps) {
+              BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
+              blockletDataMap.convertToUnsafeDMStore();
+              memorySize += blockletDataMap.getMemorySize();
+            }
+            lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
+                memorySize);
           } catch (Throwable e) {
             // clear all the memory acquired by data map in case of any failure
-            blockletDataMap.clear();
+            for (DataMap blockletDataMap : dataMaps) {
+              blockletDataMap.clear();
+            }
             throw new IOException("Problem in adding datamap to cache.", e);
           }
         }
@@ -205,8 +251,6 @@ public class BlockletDataMapIndexStore
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
           blockMetaInfoMap, identifier.getSegmentId()));
-      lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
-          dataMap.getMemorySize());
     }
     return dataMap;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
new file mode 100644
index 0000000..1f0c089
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+
+/**
+ * A cacheable wrapper of datamaps
+ */
+public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
+
+  private List<DataMap> dataMaps;
+
+  public BlockletDataMapIndexWrapper(List<DataMap> dataMaps) {
+    this.dataMaps = dataMaps;
+  }
+
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  @Override public int getAccessCount() {
+    return 0;
+  }
+
+  @Override public long getMemorySize() {
+    return 0;
+  }
+
+  public List<DataMap> getDataMaps() {
+    return dataMaps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index ce05fe2..cfdb3f0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -22,20 +22,29 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 import org.apache.hadoop.io.Writable;
-import org.xerial.snappy.Snappy;
 
 /**
  * Blocklet detail information to be sent to each executor
  */
 public class BlockletDetailInfo implements Serializable, Writable {
 
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDetailInfo.class.getName());
+
+  private static final long serialVersionUID = -4160667488700814493L;
+
   private int rowCount;
 
   private short pagesCount;
@@ -48,7 +57,9 @@ public class BlockletDetailInfo implements Serializable, Writable {
 
   private long schemaUpdatedTimeStamp;
 
-  private BlockletInfo blockletInfo;
+  private transient BlockletInfo blockletInfo;
+
+  private byte[] blockletInfoBinary;
 
   private long blockFooterOffset;
 
@@ -90,6 +101,26 @@ public class BlockletDetailInfo implements Serializable, Writable {
     this.blockletInfo = blockletInfo;
   }
 
+  private void setBlockletInfoFromBinary() throws IOException {
+    if (null == this.blockletInfo && null != blockletInfoBinary && blockletInfoBinary.length > 0) {
+      blockletInfo = new BlockletInfo();
+      ByteArrayInputStream stream = new ByteArrayInputStream(blockletInfoBinary);
+      DataInputStream inputStream = new DataInputStream(stream);
+      try {
+        blockletInfo.readFields(inputStream);
+      } catch (IOException e) {
+        LOGGER.error("Problem in reading blocklet info");
+        throw new IOException("Problem in reading blocklet info." + e.getMessage());
+      } finally {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          LOGGER.error(e, "Problem in closing input stream of reading blocklet info.");
+        }
+      }
+    }
+  }
+
   public int[] getDimLens() {
     return dimLens;
   }
@@ -131,6 +162,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
     out.writeLong(blockFooterOffset);
     out.writeInt(columnSchemaBinary.length);
     out.write(columnSchemaBinary);
+    out.writeInt(blockletInfoBinary.length);
+    out.write(blockletInfoBinary);
     out.writeLong(blockSize);
   }
 
@@ -153,6 +186,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
     byte[] schemaArray = new byte[bytesSize];
     in.readFully(schemaArray);
     readColumnSchema(schemaArray);
+    int byteSize = in.readInt();
+    blockletInfoBinary = new byte[byteSize];
+    in.readFully(blockletInfoBinary);
+    setBlockletInfoFromBinary();
     blockSize = in.readLong();
   }
 
@@ -162,17 +199,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
    * @throws IOException
    */
   public void readColumnSchema(byte[] schemaArray) throws IOException {
-    // uncompress it.
-    schemaArray = Snappy.uncompress(schemaArray);
-    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
-    DataInput schemaInput = new DataInputStream(schemaStream);
-    columnSchemas = new ArrayList<>();
-    int size = schemaInput.readShort();
-    for (int i = 0; i < size; i++) {
-      ColumnSchema columnSchema = new ColumnSchema();
-      columnSchema.readFields(schemaInput);
-      columnSchemas.add(columnSchema);
-    }
+    BlockletDataMap blockletDataMap = new BlockletDataMap();
+    columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
   }
 
   /**
@@ -224,4 +252,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
   public byte[] getColumnSchemaBinary() {
     return columnSchemaBinary;
   }
+
+  public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
+    this.blockletInfoBinary = blockletInfoBinary;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index b4d6db2..a2f49c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -36,14 +36,4 @@ public interface BlockletDetailsFetcher {
    */
   List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
       throws IOException;
-
-  /**
-   * Get the blocklet detail information based on blockletid, blockid and segmentid.
-   *
-   * @param blocklet
-   * @param segment
-   * @return
-   * @throws IOException
-   */
-  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
index d51f28d..84dd91e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Store the data map row @{@link DataMapRow} data to memory.
@@ -83,6 +84,7 @@ public class SafeMemoryDMStore extends AbstractMemoryDMStore {
 
   @Override
   public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    setSchemaDataType();
     UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
     for (DataMapRow dataMapRow : dataMapRows) {
       unsafeMemoryDMStore.addIndexRow(dataMapRow);
@@ -91,4 +93,13 @@ public class SafeMemoryDMStore extends AbstractMemoryDMStore {
     return unsafeMemoryDMStore;
   }
 
+  /**
+   * Set the dataType to the schema. Needed in case of serialization / deserialization
+   */
+  private void setSchemaDataType() {
+    for (CarbonRowSchema carbonRowSchema : schema) {
+      carbonRowSchema.setDataType(DataTypeUtil.valueOf(carbonRowSchema.getDataType(), 0, 0));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 66fa0aa..058f500 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -32,7 +33,6 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
-import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -78,7 +77,7 @@ import org.xerial.snappy.Snappy;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap implements DataMap, Cacheable {
+public class BlockletDataMap implements DataMap {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
@@ -134,7 +133,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private int[] columnCardinality;
 
-  private TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier;
+  private long blockletSchemaTime;
 
   @Override
   public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
@@ -159,6 +158,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       if (segmentProperties == null) {
         List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
         schemaBinary = convertSchemaToBinary(columnInTable);
+        blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp();
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
         createSchema(segmentProperties,
@@ -207,9 +207,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
           segmentId);
       summaryDMStore.finishWriting();
     }
-    LOGGER.info(
-        "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is " + (
-            System.currentTimeMillis() - startTime));
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
+              + (System.currentTimeMillis() - startTime));
+    }
   }
 
   private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
@@ -771,23 +773,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
     detailInfo.setBlockletId((short) blockletId);
     detailInfo.setDimLens(columnCardinality);
     detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
-    byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
-    BlockletInfo blockletInfo = null;
+    detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX));
     try {
-      if (byteArray.length > 0) {
-        blockletInfo = new BlockletInfo();
-        ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
-        DataInputStream inputStream = new DataInputStream(stream);
-        blockletInfo.readFields(inputStream);
-        inputStream.close();
-      }
       blocklet.setLocation(
           new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
               .split(","));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    detailInfo.setBlockletInfo(blockletInfo);
     blocklet.setDetailInfo(detailInfo);
     detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
     detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
@@ -919,7 +912,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return dataMapRow;
   }
 
-  private byte[] getColumnSchemaBinary() {
+  public byte[] getColumnSchemaBinary() {
     DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
     return unsafeRow.getByteArray(SCHEMA);
   }
@@ -956,17 +949,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
     }
   }
 
-  @Override
-  public long getFileTimeStamp() {
-    return 0;
-  }
-
-  @Override
-  public int getAccessCount() {
-    return 0;
-  }
-
-  @Override
   public long getMemorySize() {
     long memoryUsed = 0L;
     if (memoryDMStore != null) {
@@ -978,15 +960,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return memoryUsed;
   }
 
-  public TableBlockIndexUniqueIdentifier getTableBlockUniqueIdentifier() {
-    return tableBlockUniqueIdentifier;
-  }
-
-  public void setTableBlockUniqueIdentifier(
-      TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier) {
-    this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
-  }
-
   public void setSegmentProperties(SegmentProperties segmentProperties) {
     this.segmentProperties = segmentProperties;
   }
@@ -995,6 +968,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return columnCardinality;
   }
 
+  public long getBlockletSchemaTime() {
+    return blockletSchemaTime;
+  }
+
   private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
       throws MemoryException {
     AbstractMemoryDMStore memoryDMStore;
@@ -1024,4 +1001,24 @@ public class BlockletDataMap implements DataMap, Cacheable {
     }
   }
 
+  /**
+   * Read column schema from binary
+   * @param schemaArray
+   * @throws IOException
+   */
+  public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
+    // uncompress it.
+    schemaArray = Snappy.uncompress(schemaArray);
+    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
+    DataInput schemaInput = new DataInputStream(schemaStream);
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    int size = schemaInput.readShort();
+    for (int i = 0; i < size; i++) {
+      ColumnSchema columnSchema = new ColumnSchema();
+      columnSchema.readFields(schemaInput);
+      columnSchemas.add(columnSchema);
+    }
+    return columnSchemas;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 02ac8d7..08d8fc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -16,8 +16,6 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
-import java.util.Set;
-
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 
@@ -34,24 +32,22 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
    */
   private String filePath;
 
-  private Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers;
+  private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
 
-  public BlockletDataMapDistributable(String indexFilePath,
-      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
+  public BlockletDataMapDistributable(String indexFilePath) {
     this.filePath = indexFilePath;
-    this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
   }
 
   public String getFilePath() {
     return filePath;
   }
 
-  public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers() {
-    return tableBlockIndexUniqueIdentifiers;
+  public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
+    return tableBlockIndexUniqueIdentifier;
   }
 
-  public void setTableBlockIndexUniqueIdentifiers(
-      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
-    this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
+  public void setTableBlockIndexUniqueIdentifier(
+      TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier) {
+    this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 6d8feeb..0cc3ef0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -37,12 +37,12 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
@@ -63,7 +63,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   // segmentId -> list of index file
   private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
-  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
 
   @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -82,13 +82,18 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
     Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         new ArrayList<>(identifiers.size());
+    List<DataMap> dataMaps = new ArrayList<>();
     tableBlockIndexUniqueIdentifiers.addAll(identifiers);
-    return cache.getAll(tableBlockIndexUniqueIdentifiers);
+    List<BlockletDataMapIndexWrapper> dataMapsList = cache.getAll(tableBlockIndexUniqueIdentifiers);
+    for (BlockletDataMapIndexWrapper wrapper : dataMapsList) {
+      dataMaps.addAll(wrapper.getDataMaps());
+    }
+    return dataMaps;
   }
 
-  @Override public void cache(DataMap dataMap) throws IOException, MemoryException {
-    BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
-    cache.put(blockletDataMap.getTableBlockUniqueIdentifier(), blockletDataMap);
+  @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
+    cache.put(tableBlockIndexUniqueIdentifier, wrapper);
   }
 
   @Override
@@ -100,16 +105,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
       Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
           getTableBlockIndexUniqueIdentifiers(segment);
       // filter out the tableBlockIndexUniqueIdentifiers based on distributable
-      Set<TableBlockIndexUniqueIdentifier> validIdentifiers = BlockletDataMapUtil
+      TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
           .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
               (BlockletDataMapDistributable) distributable);
-      ((BlockletDataMapDistributable) distributable)
-          .setTableBlockIndexUniqueIdentifiers(validIdentifiers);
-      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : validIdentifiers) {
-        if (null == cache.getIfPresent(tableBlockIndexUniqueIdentifier)) {
-          distributablesToBeLoaded.add(distributable);
-          break;
-        }
+      if (null == cache.getIfPresent(validIdentifier)) {
+        ((BlockletDataMapDistributable) distributable)
+            .setTableBlockIndexUniqueIdentifier(validIdentifier);
+        distributablesToBeLoaded.add(distributable);
       }
     }
     return distributablesToBeLoaded;
@@ -147,71 +149,56 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
         getTableBlockIndexUniqueIdentifiers(segment);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
-      detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
+      detailedBlocklets.addAll(getExtendedBlocklet(identifiers, blocklet));
     }
     return detailedBlocklets;
   }
 
-  @Override
-  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
-      throws IOException {
-    if (blocklet instanceof ExtendedBlocklet) {
-      return (ExtendedBlocklet) blocklet;
-    }
-    Set<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
-    return getExtendedBlocklet(identifiers, blocklet);
-  }
-
-  private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
-      Blocklet blocklet) throws IOException {
+  private List<ExtendedBlocklet> getExtendedBlocklet(
+      Set<TableBlockIndexUniqueIdentifier> identifiers, Blocklet blocklet) throws IOException {
+    List<ExtendedBlocklet> blocklets = new ArrayList<>();
     String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
-    for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
-        DataMap dataMap = cache.get(identifier);
-        return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+    try {
+      for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
+        if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
+          BlockletDataMapIndexWrapper wrapper = cache.get(identifier);
+          List<DataMap> dataMaps = wrapper.getDataMaps();
+          for (DataMap dataMap : dataMaps) {
+            BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
+            blocklets
+                .add(blockletDataMap.getDetailedBlocklet(blocklet.getBlockletId()));
+          }
+        }
       }
+      return blocklets;
+    } catch (Exception e) {
+      throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
     }
-    throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
   }
 
   @Override
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> distributables = new ArrayList<>();
-    Map<String, String> indexFiles = null;
     try {
-      CarbonFile[] carbonIndexFiles;
-      if (segment.getSegmentFileName() == null) {
-        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
-      } else {
-        SegmentFileStore fileStore =
-            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        indexFiles = fileStore.getIndexFiles();
-        carbonIndexFiles = new CarbonFile[indexFiles.size()];
-        int i = 0;
-        for (Map.Entry<String, String> entry : indexFiles.entrySet()) {
-          String indexFile = entry.getKey();
-          String mergeFileName = entry.getValue();
-          if (null != mergeFileName) {
-            String mergeIndexPath = indexFile
-                .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1)
-                + mergeFileName;
-            carbonIndexFiles[i++] = FileFactory.getCarbonFile(mergeIndexPath);
-          } else {
-            carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
-          }
-        }
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+      CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()];
+      int identifierCounter = 0;
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+          tableBlockIndexUniqueIdentifiers) {
+        String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath();
+        String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName();
+        carbonIndexFiles[identifierCounter++] = FileFactory
+            .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName);
       }
       for (int i = 0; i < carbonIndexFiles.length; i++) {
         Path path = new Path(carbonIndexFiles[i].getPath());
-
         FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
         RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
         LocatedFileStatus fileStatus = iter.next();
         String[] location = fileStatus.getBlockLocations()[0].getHosts();
         BlockletDataMapDistributable distributable =
-            new BlockletDataMapDistributable(path.toString(), null);
+            new BlockletDataMapDistributable(path.toString());
         distributable.setLocations(location);
         distributables.add(distributable);
 
@@ -232,10 +219,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
     Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
-        DataMap dataMap = cache.getIfPresent(blockIndex);
-        if (dataMap != null) {
+        BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex);
+        if (wrapper != null) {
+          List<DataMap> dataMaps = wrapper.getDataMaps();
           cache.invalidate(blockIndex);
-          dataMap.clear();
+          for (DataMap dataMap : dataMaps) {
+            dataMap.clear();
+          }
         }
       }
     }
@@ -259,19 +249,17 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
       identifiers
           .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo));
     } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-      SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
       CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
       String parentPath = carbonFile.getParentFile().getAbsolutePath();
-      List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
-      for (String indexFile : indexFiles) {
-        identifiers.add(
-            new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
-                segmentNo));
-      }
+      identifiers.add(new TableBlockIndexUniqueIdentifier(parentPath, carbonFile.getName(),
+          carbonFile.getName(), segmentNo));
     }
-    List<DataMap> dataMaps;
+    List<DataMap> dataMaps = new ArrayList<>();
     try {
-      dataMaps = cache.getAll(identifiers);
+      List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = cache.getAll(identifiers);
+      for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
+        dataMaps.addAll(wrapper.getDataMaps());
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 9364a7a..5dcf885 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -65,9 +65,15 @@ public class SegmentIndexFileStore {
    */
   private Map<String, byte[]> carbonIndexMapWithFullPath;
 
+  /**
+   * Stores the list of index files in a merge file
+   */
+  private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
+
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
     carbonIndexMapWithFullPath = new HashMap<>();
+    carbonMergeFileToIndexFilesMap = new HashMap<>();
   }
 
   /**
@@ -182,12 +188,8 @@ public class SegmentIndexFileStore {
     Map<String, String> indexFiles = new HashMap<>();
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        List<String> indexFilesFromMergeFile =
-            getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath());
-        for (String file: indexFilesFromMergeFile) {
-          indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath()
-              + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName());
-        }
+        indexFiles
+            .put(carbonIndexFiles[i].getAbsolutePath(), carbonIndexFiles[i].getAbsolutePath());
       } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
         indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
       }
@@ -216,13 +218,14 @@ public class SegmentIndexFileStore {
    * @param mergeFilePath
    * @throws IOException
    */
-  private void readMergeFile(String mergeFilePath) throws IOException {
+  public void readMergeFile(String mergeFilePath) throws IOException {
     ThriftReader thriftReader = new ThriftReader(mergeFilePath);
     try {
       thriftReader.open();
       MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
       MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
       List<String> file_names = indexHeader.getFile_names();
+      carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
       List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
       CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
       assert (file_names.size() == fileData.size());
@@ -423,4 +426,8 @@ public class SegmentIndexFileStore {
             + " is " + (System.currentTimeMillis() - startTime));
     return carbondataFileFooter.getBlockletList();
   }
+
+  public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
+    return carbonMergeFileToIndexFilesMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index adb8715..8a07ea9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -40,6 +40,10 @@ public abstract class CarbonRowSchema implements Serializable {
     return dataType;
   }
 
+  public void setDataType(DataType dataType) {
+    this.dataType = dataType;
+  }
+
   /**
    * Gives length in case of fixed schema other wise returns length
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 82e6eab..e946a17 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -384,6 +384,35 @@ public class SegmentFileStore {
   }
 
   /**
+   * Gets all index files from this segment
+   * @return
+   */
+  public Map<String, String> getIndexOrMergeFiles() {
+    Map<String, String> indexFiles = new HashMap<>();
+    if (segmentFile != null) {
+      for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+        String location = entry.getKey();
+        if (entry.getValue().isRelative) {
+          location = tablePath + location;
+        }
+        if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+          String mergeFileName = entry.getValue().getMergeFileName();
+          if (null != mergeFileName) {
+            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName,
+                entry.getValue().mergeFileName);
+          } else {
+            for (String indexFile : entry.getValue().getFiles()) {
+              indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+                  entry.getValue().mergeFileName);
+            }
+          }
+        }
+      }
+    }
+    return indexFiles;
+  }
+
+  /**
    * Gets all carbon index files from this segment
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index f77358f..8ecdba5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -24,7 +24,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,12 +35,7 @@ import org.apache.hadoop.io.Writable;
 /**
  * class to store the information about the blocklet
  */
-public class BlockletInfo implements Serializable, Writable {
-
-  /**
-   * serialization id
-   */
-  private static final long serialVersionUID = 1873135459695635381L;
+public class BlockletInfo implements Writable {
 
   /**
    * Number of rows in this blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
index 1f45716..0b9ce4b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
@@ -52,7 +52,7 @@ public class DataFileFooter implements Serializable {
   /**
    * Information about leaf nodes of all columns in this file
    */
-  private List<BlockletInfo> blockletList;
+  private transient List<BlockletInfo> blockletList;
 
   /**
    * blocklet index of all blocklets in this file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 4deafd4..e515e35 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -80,6 +80,11 @@ public class TableInfo implements Serializable, Writable {
 
   private List<RelationIdentifier> parentRelationIdentifiers;
 
+  /**
+   * flag to check whether any schema modification operation has happened after creation of table
+   */
+  private boolean isSchemaModified;
+
   public TableInfo() {
     dataMapSchemaList = new ArrayList<>();
   }
@@ -97,6 +102,18 @@ public class TableInfo implements Serializable, Writable {
   public void setFactTable(TableSchema factTable) {
     this.factTable = factTable;
     updateParentRelationIdentifier();
+    updateIsSchemaModified();
+  }
+
+  private void updateIsSchemaModified() {
+    if (null != factTable.getSchemaEvalution()) {
+      // If schema evolution entry list size is > 1 that means an alter operation is performed
+      // which has added the new schema entry in the schema evolution list.
+      // Currently apart from create table schema evolution entries
+      // are getting added only in the alter operations.
+      isSchemaModified =
+          factTable.getSchemaEvalution().getSchemaEvolutionEntryList().size() > 1 ? true : false;
+    }
   }
 
   private void updateParentRelationIdentifier() {
@@ -254,6 +271,7 @@ public class TableInfo implements Serializable, Writable {
         parentRelationIdentifiers.get(i).write(out);
       }
     }
+    out.writeBoolean(isSchemaModified);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
@@ -288,6 +306,7 @@ public class TableInfo implements Serializable, Writable {
         this.parentRelationIdentifiers.add(relationIdentifier);
       }
     }
+    this.isSchemaModified = in.readBoolean();
   }
 
   public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
@@ -316,4 +335,8 @@ public class TableInfo implements Serializable, Writable {
     return parentRelationIdentifiers;
   }
 
+  public boolean isSchemaModified() {
+    return isSchemaModified;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 766650d..48bf179 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -18,11 +18,13 @@
 package org.apache.carbondata.core.util;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
@@ -39,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 
 
@@ -46,8 +49,10 @@ public class BlockletDataMapUtil {
 
   public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
       TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
-      Set<String> filesRead) throws IOException {
-    if (identifier.getMergeIndexFileName() != null) {
+      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
+      throws IOException {
+    if (identifier.getMergeIndexFileName() != null
+        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
       CarbonFile indexMergeFile = FileFactory.getCarbonFile(
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getMergeIndexFileName());
@@ -68,23 +73,58 @@ public class BlockletDataMapUtil {
             .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
     for (DataFileFooter footer : indexInfo) {
       String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
+      if (null == blockMetaInfoMap.get(blockPath)) {
+        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
+      }
     }
     return blockMetaInfoMap;
   }
 
-  private static BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
+  private static BlockMetaInfo createBlockMetaInfo(
+      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile)
+      throws IOException {
+    FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
+    switch (fileType) {
+      case LOCAL:
+        CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, fileType);
+        return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
+      default:
+        return fileNameToMetaInfoMapping.get(carbonDataFile);
+    }
+  }
+
+  /**
+   * This method will create file name to block Meta Info Mapping. This method will reduce the
+   * number of namenode calls and using this method one namenode will fetch 1000 entries
+   *
+   * @param segmentFilePath
+   * @return
+   * @throws IOException
+   */
+  public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
+      String segmentFilePath) throws IOException {
+    Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
     if (carbonFile instanceof AbstractDFSCarbonFile) {
+      Path path = new Path(segmentFilePath);
       RemoteIterator<LocatedFileStatus> iter =
-          ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
-      LocatedFileStatus fileStatus = iter.next();
-      String[] location = fileStatus.getBlockLocations()[0].getHosts();
-      long len = fileStatus.getLen();
-      return new BlockMetaInfo(location, len);
-    } else {
-      return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
+          ((AbstractDFSCarbonFile) carbonFile).fs.listLocatedStatus(path);
+      PathFilter pathFilter = new PathFilter() {
+        @Override public boolean accept(Path path) {
+          return CarbonTablePath.isCarbonDataFile(path.getName());
+        }
+      };
+      while (iter.hasNext()) {
+        LocatedFileStatus fileStatus = iter.next();
+        if (pathFilter.accept(fileStatus.getPath())) {
+          String[] location = fileStatus.getBlockLocations()[0].getHosts();
+          long len = fileStatus.getLen();
+          BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
+          fileNameToMetaInfoMapping.put(fileStatus.getPath().toString(), blockMetaInfo);
+        }
+      }
     }
+    return fileNameToMetaInfoMapping;
   }
 
   public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment,
@@ -96,7 +136,7 @@ public class BlockletDataMapUtil {
       indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
     } else {
       SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-      indexFiles = fileStore.getIndexFiles();
+      indexFiles = fileStore.getIndexOrMergeFiles();
     }
     for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
       Path indexFile = new Path(indexFileEntry.getKey());
@@ -108,33 +148,51 @@ public class BlockletDataMapUtil {
   }
 
   /**
-   * This method will filter out the TableBlockIndexUniqueIdentifiers belongs to that distributable
+   * This method will filter out the TableBlockIndexUniqueIdentifier belongs to that distributable
    *
    * @param tableBlockIndexUniqueIdentifiers
    * @param distributable
    * @return
    */
-  public static Set<TableBlockIndexUniqueIdentifier> filterIdentifiersBasedOnDistributable(
+  public static TableBlockIndexUniqueIdentifier filterIdentifiersBasedOnDistributable(
       Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
       BlockletDataMapDistributable distributable) {
-    Set<TableBlockIndexUniqueIdentifier> validIdentifiers =
-        new HashSet<>(tableBlockIndexUniqueIdentifiers.size());
-    if (distributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
-          tableBlockIndexUniqueIdentifiers) {
-        if (null != tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
-          validIdentifiers.add(tableBlockIndexUniqueIdentifier);
-        }
-      }
-    } else {
-      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
-          tableBlockIndexUniqueIdentifiers) {
-        if (null == tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
-          validIdentifiers.add(tableBlockIndexUniqueIdentifier);
-        }
+    TableBlockIndexUniqueIdentifier validIdentifier = null;
+    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+        tableBlockIndexUniqueIdentifiers) {
+      if (distributable.getFilePath().equals(
+          tableBlockIndexUniqueIdentifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR
+              + tableBlockIndexUniqueIdentifier.getIndexFileName())) {
+        validIdentifier = tableBlockIndexUniqueIdentifier;
+        break;
       }
     }
-    return validIdentifiers;
+    return validIdentifier;
+  }
+
+  /**
+   * This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file
+   *
+   * @param identifier
+   * @return
+   * @throws IOException
+   */
+  public static List<TableBlockIndexUniqueIdentifier> getIndexFileIdentifiersFromMergeFile(
+      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore segmentIndexFileStore)
+      throws IOException {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+    String mergeFilePath =
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getIndexFileName();
+    segmentIndexFileStore.readMergeFile(mergeFilePath);
+    List<String> indexFiles =
+        segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
+    for (String indexFile : indexFiles) {
+      tableBlockIndexUniqueIdentifiers.add(
+          new TableBlockIndexUniqueIdentifier(identifier.getIndexFilePath(), indexFile,
+              identifier.getIndexFileName(), identifier.getSegmentId()));
+    }
+    return tableBlockIndexUniqueIdentifiers;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index 67c50e5..67fe8fb 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -48,7 +49,7 @@ public class TestBlockletDataMapFactory {
 
   private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
 
-  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
 
   @Before public void setUp() {
     blockletDataMapFactory = new BlockletDataMapFactory();
@@ -62,23 +63,24 @@ public class TestBlockletDataMapFactory {
   @Test public void addDataMapToCache()
       throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
       IllegalAccessException {
-    BlockletDataMap dataMap = new BlockletDataMap();
-    dataMap.setTableBlockUniqueIdentifier(tableBlockIndexUniqueIdentifier);
-    Method method =
-        BlockletDataMapFactory.class.getDeclaredMethod("cache", DataMap.class);
+    List<DataMap> dataMaps = new ArrayList<>();
+    Method method = BlockletDataMapFactory.class
+        .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class,
+            BlockletDataMapIndexWrapper.class);
     method.setAccessible(true);
-    method.invoke(blockletDataMapFactory, dataMap);
-    DataMap result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+    method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier,
+        new BlockletDataMapIndexWrapper(dataMaps));
+    BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
     assert null != result;
   }
 
   @Test public void getValidDistributables() throws IOException {
     BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
-        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex", null);
+        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
     Segment segment = new Segment("0", null);
     blockletDataMapDistributable.setSegment(segment);
     BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
-        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/1521012756710.carbonindexmerge", null);
+        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");
     blockletDataMapDistributable1.setSegment(segment);
     List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
     dataMapDistributables.add(blockletDataMapDistributable);
@@ -89,15 +91,10 @@ public class TestBlockletDataMapFactory {
         TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
             new TableBlockIndexUniqueIdentifier(
                 "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
-                "0_batchno0-0-1521012756701.carbonindex", "1521012756710.carbonindexmerge", "0");
-        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier2 =
-            new TableBlockIndexUniqueIdentifier(
-                "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
-                "0_batchno0-0-1521012756702.carbonindex", "1521012756710.carbonindexmerge", "0");
+                "0_batchno0-0-1521012756701.carbonindex", null, "0");
         Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3);
         tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
         tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
-        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier2);
         return tableBlockIndexUniqueIdentifiers;
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 9f67b22..32af8d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,9 +16,9 @@
  */
 package org.apache.carbondata.hadoop;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,13 +40,14 @@ public class CacheClient {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CacheClient.class.getName());
 
+  private final Object lock = new Object();
+
   // segment access client for driver LRU cache
   private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
       segmentAccessClient;
 
   private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>
-      segmentProperties =
-      new HashMap<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>();
+      segmentProperties = new ConcurrentHashMap<>();
 
   public CacheClient() {
     Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
@@ -78,13 +79,18 @@ public class CacheClient {
             columnCardinality);
     SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
     if (null == segmentProperties) {
-      // create a metadata details
-      // this will be useful in query handling
-      // all the data file metadata will have common segment properties we
-      // can use first one to get create the segment properties
-      LOGGER.info("Constructing new SegmentProperties");
-      segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
-      this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+      synchronized (lock) {
+        segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+        if (null == segmentProperties) {
+          // create a metadata details
+          // this will be useful in query handling
+          // all the data file metadata will have common segment properties we
+          // can use first one to get create the segment properties
+          LOGGER.info("Constructing new SegmentProperties");
+          segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
+          this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+        }
+      }
     }
     return segmentProperties;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
index 35d4b39..4b57d37 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api;
 
 import java.util.List;
 
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -31,9 +31,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  */
 public abstract class AbstractDataMapJob implements DataMapJob {
 
-  @Override
-  public List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, DataMap> format) {
-    return null;
+  @Override public void execute(CarbonTable carbonTable,
+      FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
   }
 
   @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index 3e32ecb..d41acec 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api;
 import java.io.Serializable;
 import java.util.List;
 
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  */
 public interface DataMapJob extends Serializable {
 
-  List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, DataMap> dataMapFormat);
+  void execute(CarbonTable carbonTable,
+      FileInputFormat<Void, BlockletDataMapIndexWrapper> dataMapFormat);
 
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
       FilterResolverIntf resolverIntf);