You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/07 08:56:11 UTC

[1/2] carbondata git commit: [CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

Repository: carbondata
Updated Branches:
  refs/heads/master a7926ea13 -> 531ecdf3f


http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 6803fc8..c6efd77 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -70,9 +70,15 @@ public class SegmentIndexFileStore {
    */
   private Map<String, byte[]> carbonIndexMapWithFullPath;
 
+  /**
+   * Stores the list of index files in a merge file
+   */
+  private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
+
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
     carbonIndexMapWithFullPath = new HashMap<>();
+    carbonMergeFileToIndexFilesMap = new HashMap<>();
   }
 
   /**
@@ -201,6 +207,28 @@ public class SegmentIndexFileStore {
   }
 
   /**
+   * Read all index file names of the segment
+   *
+   * @param segmentPath
+   * @return
+   * @throws IOException
+   */
+  public Map<String, String> getMergeOrIndexFilesFromSegment(String segmentPath)
+      throws IOException {
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    Map<String, String> indexFiles = new HashMap<>();
+    for (int i = 0; i < carbonIndexFiles.length; i++) {
+      if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFiles
+            .put(carbonIndexFiles[i].getAbsolutePath(), carbonIndexFiles[i].getAbsolutePath());
+      } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
+      }
+    }
+    return indexFiles;
+  }
+
+  /**
    * List all the index files inside merge file.
    * @param mergeFile
    * @return
@@ -221,13 +249,14 @@ public class SegmentIndexFileStore {
    * @param mergeFilePath
    * @throws IOException
    */
-  private void readMergeFile(String mergeFilePath) throws IOException {
+  public void readMergeFile(String mergeFilePath) throws IOException {
     ThriftReader thriftReader = new ThriftReader(mergeFilePath);
     try {
       thriftReader.open();
       MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
       MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
       List<String> file_names = indexHeader.getFile_names();
+      carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
       List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
       CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
       assert (file_names.size() == fileData.size());
@@ -298,8 +327,8 @@ public class SegmentIndexFileStore {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
-        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT);
+        return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
       }
     });
   }
@@ -428,4 +457,8 @@ public class SegmentIndexFileStore {
             + " is " + (System.currentTimeMillis() - startTime));
     return carbondataFileFooter.getBlockletList();
   }
+
+  public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
+    return carbonMergeFileToIndexFilesMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index b764bdf..496a1d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,13 +16,15 @@
  */
 package org.apache.carbondata.core.indexstore.row;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 
 /**
  * It is just a normal row to store data. Implementation classes could be safe and unsafe.
  * TODO move this class a global row and use across loading after DataType is changed class
  */
-public abstract class DataMapRow {
+public abstract class DataMapRow implements Serializable {
 
   protected CarbonRowSchema[] schemas;
 
@@ -88,4 +90,13 @@ public abstract class DataMapRow {
   public int getColumnCount() {
     return schemas.length;
   }
+
+  /**
+   * default implementation
+   *
+   * @return
+   */
+  public DataMapRow convertToSafeRow() {
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 1b95984..1c1ecad 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -30,7 +30,12 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
  */
 public class UnsafeDataMapRow extends DataMapRow {
 
-  private MemoryBlock block;
+  private static final long serialVersionUID = -1156704133552046321L;
+
+  // As it is an unsafe memory block it is not recommended to serialize.
+  // If at all required to be serialized then override writeObject methods
+  // to which should take care of clearing the unsafe memory post serialization
+  private transient MemoryBlock block;
 
   private int pointer;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 813be4a..1a77467 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -16,12 +16,16 @@
  */
 package org.apache.carbondata.core.indexstore.schema;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * It just have 2 types right now, either fixed or variable.
  */
-public abstract class CarbonRowSchema {
+public abstract class CarbonRowSchema implements Serializable {
+
+  private static final long serialVersionUID = -8061282029097686495L;
 
   protected DataType dataType;
 
@@ -29,6 +33,10 @@ public abstract class CarbonRowSchema {
     this.dataType = dataType;
   }
 
+  public void setDataType(DataType dataType) {
+    this.dataType = dataType;
+  }
+
   /**
    * Either fixed or variable length.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index dff496b..9326b1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -492,6 +492,35 @@ public class SegmentFileStore {
   }
 
   /**
+   * Gets all index files from this segment
+   * @return
+   */
+  public Map<String, String> getIndexOrMergeFiles() {
+    Map<String, String> indexFiles = new HashMap<>();
+    if (segmentFile != null) {
+      for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+        String location = entry.getKey();
+        if (entry.getValue().isRelative) {
+          location = tablePath + location;
+        }
+        if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+          String mergeFileName = entry.getValue().getMergeFileName();
+          if (null != mergeFileName) {
+            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName,
+                entry.getValue().mergeFileName);
+          } else {
+            for (String indexFile : entry.getValue().getFiles()) {
+              indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+                  entry.getValue().mergeFileName);
+            }
+          }
+        }
+      }
+    }
+    return indexFiles;
+  }
+
+  /**
    * Gets all carbon index files from this segment
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index c8ac15a..c7bcf2e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -97,6 +97,11 @@ public class TableInfo implements Serializable, Writable {
 
   private List<RelationIdentifier> parentRelationIdentifiers;
 
+  /**
+   * flag to check whether any schema modification operation has happened after creation of table
+   */
+  private boolean isSchemaModified;
+
   public TableInfo() {
     dataMapSchemaList = new ArrayList<>();
     isTransactionalTable = true;
@@ -115,6 +120,18 @@ public class TableInfo implements Serializable, Writable {
   public void setFactTable(TableSchema factTable) {
     this.factTable = factTable;
     updateParentRelationIdentifier();
+    updateIsSchemaModified();
+  }
+
+  private void updateIsSchemaModified() {
+    if (null != factTable.getSchemaEvalution()) {
+      // If schema evolution entry list size is > 1 that means an alter operation is performed
+      // which has added the new schema entry in the schema evolution list.
+      // Currently apart from create table schema evolution entries
+      // are getting added only in the alter operations.
+      isSchemaModified =
+          factTable.getSchemaEvalution().getSchemaEvolutionEntryList().size() > 1 ? true : false;
+    }
   }
 
   private void updateParentRelationIdentifier() {
@@ -273,6 +290,7 @@ public class TableInfo implements Serializable, Writable {
         parentRelationIdentifiers.get(i).write(out);
       }
     }
+    out.writeBoolean(isSchemaModified);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
@@ -308,6 +326,7 @@ public class TableInfo implements Serializable, Writable {
         this.parentRelationIdentifiers.add(relationIdentifier);
       }
     }
+    this.isSchemaModified = in.readBoolean();
   }
 
   public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
@@ -343,4 +362,9 @@ public class TableInfo implements Serializable, Writable {
   public void setTransactionalTable(boolean transactionalTable) {
     isTransactionalTable = transactionalTable;
   }
+
+  public boolean isSchemaModified() {
+    return isSchemaModified;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index bc4a90d..41ce31c 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -68,11 +68,11 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
     if (segment.getSegmentFileName() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
-      indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+      indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
       SegmentFileStore fileStore =
           new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-      indexFiles = fileStore.getIndexFiles();
+      indexFiles = fileStore.getIndexOrMergeFiles();
     }
     return indexFiles;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
new file mode 100644
index 0000000..0d28b9f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class BlockletDataMapUtil {
+
+  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
+      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
+      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
+      throws IOException {
+    if (identifier.getMergeIndexFileName() != null
+        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getMergeIndexFileName());
+      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
+        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+        filesRead.add(indexMergeFile.getPath());
+      }
+    }
+    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getIndexFileName()) });
+    }
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+    for (DataFileFooter footer : indexInfo) {
+      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
+      if (null == blockMetaInfoMap.get(blockPath)) {
+        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
+      }
+    }
+    return blockMetaInfoMap;
+  }
+
+  /**
+   * This method will create file name to block Meta Info Mapping. This method will reduce the
+   * number of namenode calls and using this method one namenode will fetch 1000 entries
+   *
+   * @param segmentFilePath
+   * @return
+   * @throws IOException
+   */
+  public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
+      String segmentFilePath) throws IOException {
+    Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
+    if (carbonFile instanceof AbstractDFSCarbonFile) {
+      PathFilter pathFilter = new PathFilter() {
+        @Override public boolean accept(Path path) {
+          return CarbonTablePath.isCarbonDataFile(path.getName());
+        }
+      };
+      CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
+      for (CarbonFile file : carbonFiles) {
+        String[] location = file.getLocations();
+        long len = file.getSize();
+        BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
+        fileNameToMetaInfoMapping.put(file.getPath().toString(), blockMetaInfo);
+      }
+    }
+    return fileNameToMetaInfoMapping;
+  }
+
+  private static BlockMetaInfo createBlockMetaInfo(
+      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile) {
+    FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
+    switch (fileType) {
+      case LOCAL:
+        CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, fileType);
+        return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
+      default:
+        return fileNameToMetaInfoMapping.get(carbonDataFile);
+    }
+  }
+
+  public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment)
+      throws IOException {
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>();
+    Map<String, String> indexFiles = segment.getCommittedIndexFile();
+    for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
+      Path indexFile = new Path(indexFileEntry.getKey());
+      tableBlockIndexUniqueIdentifiers.add(
+          new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(),
+              indexFileEntry.getValue(), segment.getSegmentNo()));
+    }
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  /**
+   * This method will filter out the TableBlockIndexUniqueIdentifier belongs to that distributable
+   *
+   * @param tableBlockIndexUniqueIdentifiers
+   * @param distributable
+   * @return
+   */
+  public static TableBlockIndexUniqueIdentifier filterIdentifiersBasedOnDistributable(
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+      BlockletDataMapDistributable distributable) {
+    TableBlockIndexUniqueIdentifier validIdentifier = null;
+    String fileName = CarbonTablePath.DataFileUtil.getFileName(distributable.getFilePath());
+    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+        tableBlockIndexUniqueIdentifiers) {
+      if (fileName.equals(tableBlockIndexUniqueIdentifier.getIndexFileName())) {
+        validIdentifier = tableBlockIndexUniqueIdentifier;
+        break;
+      }
+    }
+    return validIdentifier;
+  }
+
+  /**
+   * This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file
+   *
+   * @param identifier
+   * @return
+   * @throws IOException
+   */
+  public static List<TableBlockIndexUniqueIdentifier> getIndexFileIdentifiersFromMergeFile(
+      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore segmentIndexFileStore)
+      throws IOException {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+    String mergeFilePath =
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getIndexFileName();
+    segmentIndexFileStore.readMergeFile(mergeFilePath);
+    List<String> indexFiles =
+        segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
+    for (String indexFile : indexFiles) {
+      tableBlockIndexUniqueIdentifiers.add(
+          new TableBlockIndexUniqueIdentifier(identifier.getIndexFilePath(), indexFile,
+              identifier.getIndexFileName(), identifier.getSegmentId()));
+    }
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 8544da9..3823aef 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -219,6 +219,11 @@ public class SessionParams implements Serializable, Cloneable {
             throw new InvalidConfigurationException(
                 String.format("Invalid configuration of %s, datamap does not exist", key));
           }
+        } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+          isValid = CarbonUtil.validateBoolean(value);
+          if (!isValid) {
+            throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
+          }
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index b9f4838..62192ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -533,7 +533,7 @@ public class CarbonTablePath {
     /**
      * Return the file name from file path
      */
-    private static String getFileName(String dataFilePath) {
+    public static String getFileName(String dataFilePath) {
       int endIndex = dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
       if (endIndex > -1) {
         return dataFilePath.substring(endIndex + 1, dataFilePath.length());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
new file mode 100644
index 0000000..dfbdd29
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+
+import mockit.Deencapsulation;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletDataMapFactory {
+
+  private CarbonTable carbonTable;
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  private TableInfo tableInfo;
+
+  private BlockletDataMapFactory blockletDataMapFactory;
+
+  private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
+  private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
+
+  @Before public void setUp()
+      throws ClassNotFoundException, IllegalAccessException, InvocationTargetException,
+      InstantiationException {
+    tableInfo = new TableInfo();
+    Constructor<?> constructor =
+        Class.forName("org.apache.carbondata.core.metadata.schema.table.CarbonTable")
+            .getDeclaredConstructors()[0];
+    constructor.setAccessible(true);
+    carbonTable = (CarbonTable) constructor.newInstance();
+    absoluteTableIdentifier =
+        AbsoluteTableIdentifier.from("/opt/store/default/carbon_table/", "default", "carbon_table");
+    Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier);
+    Deencapsulation.setField(carbonTable, "tableInfo", tableInfo);
+    blockletDataMapFactory = new BlockletDataMapFactory(carbonTable, new DataMapSchema());
+    Deencapsulation.setField(blockletDataMapFactory, "cache",
+        CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP));
+    tableBlockIndexUniqueIdentifier =
+        new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+            "0_batchno0-0-1521012756709.carbonindex", null, "0");
+    cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+  }
+
+  @Test public void addDataMapToCache()
+      throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
+      IllegalAccessException {
+    List<BlockletDataMap> dataMaps = new ArrayList<>();
+    Method method = BlockletDataMapFactory.class
+        .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class,
+            BlockletDataMapIndexWrapper.class);
+    method.setAccessible(true);
+    method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier,
+        new BlockletDataMapIndexWrapper(dataMaps));
+    BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+    assert null != result;
+  }
+
+  @Test public void getValidDistributables() throws IOException {
+    BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
+        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
+    Segment segment = new Segment("0", null);
+    blockletDataMapDistributable.setSegment(segment);
+    BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
+        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");
+    blockletDataMapDistributable1.setSegment(segment);
+    List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
+    dataMapDistributables.add(blockletDataMapDistributable);
+    dataMapDistributables.add(blockletDataMapDistributable1);
+    new MockUp<BlockletDataMapFactory>() {
+      @Mock Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+          Segment segment) {
+        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
+            new TableBlockIndexUniqueIdentifier(
+                "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+                "0_batchno0-0-1521012756701.carbonindex", null, "0");
+        Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
+        return tableBlockIndexUniqueIdentifiers;
+      }
+    };
+    List<DataMapDistributable> validDistributables =
+        blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables);
+    assert 1 == validDistributables.size();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 8be1e2e..32af8d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,21 +16,39 @@
  */
 package org.apache.carbondata.hadoop;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
  * CacheClient : Holds all the Cache access clients for Btree, Dictionary
  */
 public class CacheClient {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CacheClient.class.getName());
+
+  private final Object lock = new Object();
+
   // segment access client for driver LRU cache
   private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
       segmentAccessClient;
 
+  private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>
+      segmentProperties = new ConcurrentHashMap<>();
+
   public CacheClient() {
     Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
         CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
@@ -45,4 +63,35 @@ public class CacheClient {
   public void close() {
     segmentAccessClient.close();
   }
+
+  /**
+   * Method to get the segment properties and avoid construction of new segment properties until
+   * the schema is not modified
+   *
+   * @param tableIdentifier
+   * @param columnsInTable
+   * @param columnCardinality
+   */
+  public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier tableIdentifier,
+      List<ColumnSchema> columnsInTable, int[] columnCardinality) {
+    SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper =
+        new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, columnsInTable,
+            columnCardinality);
+    SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+    if (null == segmentProperties) {
+      synchronized (lock) {
+        segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+        if (null == segmentProperties) {
+          // create a metadata details
+          // this will be useful in query handling
+          // all the data file metadata will have common segment properties we
+          // can use first one to get create the segment properties
+          LOGGER.info("Constructing new SegmentProperties");
+          segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
+          this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+        }
+      }
+    }
+    return segmentProperties;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
new file mode 100644
index 0000000..6835184
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * abstract class for data map job
+ */
+public abstract class AbstractDataMapJob implements DataMapJob {
+
+  @Override public void execute(CarbonTable carbonTable,
+      FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
+  }
+
+  @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+      FilterResolverIntf resolverIntf) {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 2af147d..0e8cf6a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -67,7 +67,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
-  protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+  public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
     CarbonTable carbonTableTemp;
     if (carbonTable == null) {
       // carbon table should be created either from deserialized table info (schema saved in

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 25cc228..a5d4df2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -156,7 +156,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
+  public abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
       throws IOException;
 
   public static void setTablePath(Configuration configuration, String tablePath) {
@@ -172,7 +172,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
   }
 
-  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+  public static void setDataMapJob(Configuration configuration, Object dataMapJob)
       throws IOException {
     if (dataMapJob != null) {
       String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
@@ -466,15 +466,30 @@ m filterExpression
   private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper,
       DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException {
-    DistributableDataMapFormat datamapDstr =
-        new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
-            partitionsToPrune, BlockletDataMapFactory.class.getName());
-    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+    String className = "org.apache.carbondata.hadoop.api.DistributableDataMapFormat";
+    FileInputFormat dataMapFormat =
+        createDataMapJob(carbonTable, dataMapExprWrapper, segmentIds, partitionsToPrune, className);
+    List<ExtendedBlocklet> prunedBlocklets =
+        dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, resolver);
     // Apply expression on the blocklets.
     prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     return prunedBlocklets;
   }
 
+
+  public static FileInputFormat createDataMapJob(CarbonTable carbonTable,
+      DataMapExprWrapper dataMapExprWrapper, List<Segment> segments,
+      List<PartitionSpec> partitionsToPrune, String clsName) {
+    try {
+      Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0];
+      return (FileInputFormat) cons
+          .newInstance(carbonTable, dataMapExprWrapper, segments, partitionsToPrune,
+              BlockletDataMapFactory.class.getName());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Prune the segments from the already pruned blocklets.
    * @param segments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f93be63..cd34bb8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -103,7 +103,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+  public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
     if (carbonTable == null) {
       // carbon table should be created either from deserialized table info (schema saved in
       // hive metastore) or by reading schema in HDFS (schema saved in HDFS)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index 64936aa..c439219 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,15 +19,21 @@ package org.apache.carbondata.hadoop.api;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
 /**
  * Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the
  * datamaps distributably and returns the final blocklet list
  */
 public interface DataMapJob extends Serializable {
 
+  void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);
+
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
       FilterResolverIntf filter);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 36c7414..3208a28 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -22,6 +22,8 @@ import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Locale;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -49,6 +51,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  */
 public class CarbonInputFormatUtil {
 
+  /**
+   * Attribute for Carbon LOGGER.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonProperties.class.getName());
+
   public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
@@ -58,6 +66,7 @@ public class CarbonInputFormatUtil {
     CarbonTableInputFormat.setTableName(
         job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job.getConfiguration());
     return carbonInputFormat;
   }
 
@@ -71,6 +80,7 @@ public class CarbonInputFormatUtil {
     CarbonTableInputFormat.setTableName(
         job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job.getConfiguration());
     return carbonTableInputFormat;
   }
 
@@ -108,11 +118,10 @@ public class CarbonInputFormatUtil {
     CarbonInputFormat.setQuerySegment(conf, identifier);
     CarbonInputFormat.setFilterPredicates(conf, filterExpression);
     CarbonInputFormat.setColumnProjection(conf, columnProjection);
-    if (dataMapJob != null &&
-        Boolean.valueOf(CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT))) {
+    if (dataMapJob != null) {
       CarbonInputFormat.setDataMapJob(conf, dataMapJob);
+    } else {
+      setDataMapJobIfConfigured(conf);
     }
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
@@ -147,6 +156,32 @@ public class CarbonInputFormatUtil {
     return format;
   }
 
+  /**
+   * This method set DataMapJob if configured
+   *
+   * @param conf
+   * @throws IOException
+   */
+  public static void setDataMapJobIfConfigured(Configuration conf) throws IOException {
+    String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
+    CarbonTableInputFormat.setDataMapJob(conf, createDataMapJob(className));
+  }
+
+  /**
+   * Creates instance for the DataMap Job class
+   *
+   * @param className
+   * @return
+   */
+  public static Object createDataMapJob(String className) {
+    try {
+      return Class.forName(className).getDeclaredConstructors()[0].newInstance();
+    } catch (Exception e) {
+      LOGGER.error(e);
+      return null;
+    }
+  }
+
   public static String createJobTrackerID(java.util.Date date) {
     return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 17c57b6..b0a59d4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -41,6 +41,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
         CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+      .addProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, "true")
     sql("use lucene")
     sql("DROP TABLE IF EXISTS normal_test")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0d960d0..24a6927 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -56,6 +56,7 @@ import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
@@ -554,12 +555,7 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setQuerySegment(conf, identifier)
     CarbonInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
-    CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    if (CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    }
+    CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
 
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -599,12 +595,7 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setQuerySegment(conf, identifier)
     CarbonInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
-    CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    if (CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    }
+    CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 8d2f9ee..f51c3bc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -29,12 +29,12 @@ import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledExcepti
 
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, DistributableDataMapFormat}
 
 /**
  * Spark job to execute datamap job and prune all the datamaps distributable
  */
-class SparkDataMapJob extends DataMapJob {
+class SparkDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
       filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 488a53d..6622246 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 case class CarbonCountStar(
     attributesRaw: Seq[Attribute],
@@ -74,11 +75,13 @@ case class CarbonCountStar(
     val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     SparkHadoopUtil.get.addCredentials(jobConf)
+    CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo)
     val job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     CarbonInputFormat
       .setTransactionalTable(job.getConfiguration,
         carbonTable.getTableInfo.isTransactionalTable)
+    CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index cce23dc..29dcec9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -115,6 +115,15 @@ object CarbonSetCommand {
           "\" carbon.datamap.visible.<database_name>.<table_name>.<database_name>" +
           " = <true/false> \" format")
       }
+    } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+      if (key.split("\\.").length == 6) {
+        sessionParams.addProperty(key.toLowerCase(), value)
+      }
+      else {
+        throw new MalformedCarbonCommandException(
+          "property should be in \" carbon.load.datamaps.parallel.<database_name>" +
+          ".<table_name>=<true/false> \" format.")
+      }
     }
   }
 


[2/2] carbondata git commit: [CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

Posted by ra...@apache.org.
[CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

This PR has two JIRA fixes
[CARBONDATA-2310] Refactored code to improve Distributable interface
[CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

This closes #2244


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/531ecdf3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/531ecdf3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/531ecdf3

Branch: refs/heads/master
Commit: 531ecdf3f40c064d4ff6ad36c43fa90a2d423588
Parents: a7926ea
Author: dhatchayani <dh...@gmail.com>
Authored: Fri Apr 27 23:03:52 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 7 13:11:29 2018 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/core/cache/Cache.java |  10 +
 .../dictionary/AbstractDictionaryCache.java     |   6 +
 .../core/constants/CarbonCommonConstants.java   |   3 +
 .../core/datamap/dev/CacheableDataMap.java      |  51 ++++
 .../core/datastore/SegmentTaskIndexStore.java   |   7 +
 .../filesystem/AbstractDFSCarbonFile.java       |   5 +-
 .../core/datastore/filesystem/CarbonFile.java   |   3 +-
 .../datastore/filesystem/LocalCarbonFile.java   |   3 +-
 .../core/indexstore/AbstractMemoryDMStore.java  |  63 +++++
 .../indexstore/BlockletDataMapIndexStore.java   | 187 ++++++++-------
 .../indexstore/BlockletDataMapIndexWrapper.java |  52 +++++
 .../core/indexstore/BlockletDetailInfo.java     |  66 ++++--
 .../core/indexstore/SafeMemoryDMStore.java      | 105 +++++++++
 .../TableBlockIndexUniqueIdentifier.java        |   5 +-
 .../core/indexstore/UnsafeMemoryDMStore.java    |  25 +-
 .../blockletindex/BlockletDataMap.java          | 232 ++++++++++++-------
 .../BlockletDataMapDistributable.java           |  12 +
 .../blockletindex/BlockletDataMapFactory.java   | 127 ++++++----
 .../blockletindex/BlockletDataMapModel.java     |  12 +
 .../blockletindex/SegmentIndexFileStore.java    |  39 +++-
 .../core/indexstore/row/DataMapRow.java         |  13 +-
 .../core/indexstore/row/UnsafeDataMapRow.java   |   7 +-
 .../core/indexstore/schema/CarbonRowSchema.java |  10 +-
 .../core/metadata/SegmentFileStore.java         |  29 +++
 .../core/metadata/schema/table/TableInfo.java   |  24 ++
 .../TableStatusReadCommittedScope.java          |   4 +-
 .../core/util/BlockletDataMapUtil.java          | 180 ++++++++++++++
 .../carbondata/core/util/SessionParams.java     |   5 +
 .../core/util/path/CarbonTablePath.java         |   2 +-
 .../TestBlockletDataMapFactory.java             | 126 ++++++++++
 .../apache/carbondata/hadoop/CacheClient.java   |  49 ++++
 .../hadoop/api/AbstractDataMapJob.java          |  42 ++++
 .../hadoop/api/CarbonFileInputFormat.java       |   2 +-
 .../hadoop/api/CarbonInputFormat.java           |  27 ++-
 .../hadoop/api/CarbonTableInputFormat.java      |   2 +-
 .../carbondata/hadoop/api/DataMapJob.java       |   6 +
 .../hadoop/util/CarbonInputFormatUtil.java      |  43 +++-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |   1 +
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  15 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   4 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   3 +
 .../execution/command/CarbonHiveCommands.scala  |   9 +
 42 files changed, 1335 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
index 04fa18a..6df36fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.carbondata.core.memory.MemoryException;
+
 /**
  * A semi-persistent mapping from keys to values. Cache entries are manually added using
  * #get(Key), #getAll(List<Keys>) , and are stored in the cache until
@@ -69,6 +71,14 @@ public interface Cache<K, V> {
   void invalidate(K key);
 
   /**
+   * This method will add the value to the cache for the given key
+   *
+   * @param key
+   * @param value
+   */
+  void put(K key, V value) throws IOException, MemoryException;
+
+  /**
    * Access count of Cacheable entry will be decremented
    *
    * @param keys

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index fb67208..83c7237 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,6 +59,12 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
     initThreadPoolSize();
   }
 
+  @Override
+  public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+
   /**
    * This method will initialize the thread pool size to be used for creating the
    * max number of threads for a job

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f9bf220..56607b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1726,6 +1726,9 @@ public final class CarbonCommonConstants {
    */
   public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
 
+  // Property to enable parallel datamap loading for a table
+  public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel.";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
new file mode 100644
index 0000000..dba0840
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Interface for data map caching
+ */
+public interface CacheableDataMap {
+
+  /**
+   * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
+   *
+   * @param tableBlockIndexUniqueIdentifier
+   * @param blockletDataMapIndexWrapper
+   */
+  void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;
+
+  /**
+   * Get all the uncached distributables from the list.
+   *
+   * @param distributables
+   * @return
+   */
+  List<DataMapDistributable> getAllUncachedDistributables(List<DataMapDistributable> distributables)
+      throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 537c635..d325f21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.datastore.block.SegmentTaskIndex;
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -142,6 +143,12 @@ public class SegmentTaskIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
+  @Override
+  public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value)
+      throws IOException, MemoryException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * returns block timestamp value from the given task
    * @param taskKey

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 0419405..7255237 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -526,7 +527,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
   }
 
   @Override
-  public CarbonFile[] locationAwareListFiles() throws IOException {
+  public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
     if (null != fileStatus && fileStatus.isDirectory()) {
       List<FileStatus> listStatus = new ArrayList<>();
       Path path = fileStatus.getPath();
@@ -534,7 +535,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
           path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
       while (iter.hasNext()) {
         LocatedFileStatus fileStatus = iter.next();
-        if (fileStatus.getLen() > 0) {
+        if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
           listStatus.add(fileStatus);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index eb65dfd..a104137 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 public interface CarbonFile {
@@ -41,7 +42,7 @@ public interface CarbonFile {
    * It returns list of files with location details.
    * @return
    */
-  CarbonFile[] locationAwareListFiles() throws IOException;
+  CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException;
 
   String getName();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index d28e85e..60b7e17 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -49,6 +49,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
@@ -448,7 +449,7 @@ public class LocalCarbonFile implements CarbonFile {
     return file.createNewFile();
   }
 
-  @Override public CarbonFile[] locationAwareListFiles() throws IOException {
+  @Override public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
     return listFiles();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
new file mode 100644
index 0000000..e6bc691
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+
+/**
+ * Store the data map row @{@link DataMapRow}
+ */
+public abstract class AbstractMemoryDMStore implements Serializable {
+
+  protected boolean isMemoryFreed;
+
+  protected CarbonRowSchema[] schema;
+
+  protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+  public AbstractMemoryDMStore(CarbonRowSchema[] schema) {
+    this.schema = schema;
+  }
+
+  public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException;
+
+  public abstract DataMapRow getDataMapRow(int index);
+
+  public abstract void freeMemory();
+
+  public abstract int getMemoryUsed();
+
+  public CarbonRowSchema[] getSchema() {
+    return schema;
+  }
+
+  public abstract int getRowCount();
+
+  public void finishWriting() throws MemoryException {
+    // do nothing in default implementation
+  }
+
+  public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    throw new UnsupportedOperationException("Operation not allowed");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 167a04e..ba4193e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,26 +29,19 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
  * blocks
  */
 public class BlockletDataMapIndexStore
-    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
   /**
@@ -76,106 +68,93 @@ public class BlockletDataMapIndexStore
   }
 
   @Override
-  public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier)
+  public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
       throws IOException {
     String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
-    BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
-    if (dataMap == null) {
+    BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
+        (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
+    List<BlockletDataMap> dataMaps = new ArrayList<>();
+    if (blockletDataMapIndexWrapper == null) {
       try {
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
         Set<String> filesRead = new HashSet<>();
-        Map<String, BlockMetaInfo> blockMetaInfoMap =
-            getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
-        dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
-      } catch (MemoryException e) {
+        long memorySize = 0L;
+        String segmentFilePath = identifier.getIndexFilePath();
+        Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil
+            .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+        // if the identifier is not a merge file we can directly load the datamaps
+        if (identifier.getMergeIndexFileName() == null) {
+          Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+              .getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
+                  carbonDataFileBlockMetaInfoMapping);
+          BlockletDataMap blockletDataMap =
+              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
+          memorySize += blockletDataMap.getMemorySize();
+          dataMaps.add(blockletDataMap);
+          blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+        } else {
+          // if the identifier is a merge file then collect the index files and load the datamaps
+          List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+              BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
+          for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
+              tableBlockIndexUniqueIdentifiers) {
+            Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+                .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
+                    carbonDataFileBlockMetaInfoMapping);
+            BlockletDataMap blockletDataMap =
+                loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
+            memorySize += blockletDataMap.getMemorySize();
+            dataMaps.add(blockletDataMap);
+          }
+          blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+        }
+        lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
+            memorySize);
+      } catch (Throwable e) {
+        // clear all the memory used by datamaps loaded
+        for (DataMap dataMap : dataMaps) {
+          dataMap.clear();
+        }
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
       }
     }
-    return dataMap;
-  }
-
-  private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
-      SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
-    if (identifier.getMergeIndexFileName() != null) {
-      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
-          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getMergeIndexFileName());
-      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
-        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
-        filesRead.add(indexMergeFile.getPath());
-      }
-    }
-    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
-      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
-          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getIndexFileName()) });
-    }
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
-    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
-        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
-    for (DataFileFooter footer : indexInfo) {
-      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (FileFactory.isFileExist(blockPath)) {
-        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
-      } else {
-        LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName()
-            + " The block does not exist. The block might be got deleted due to clean up post"
-            + " update/delete operation over table.");
-      }
-    }
-    return blockMetaInfoMap;
-  }
-
-  private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
-    if (carbonFile instanceof AbstractDFSCarbonFile) {
-      RemoteIterator<LocatedFileStatus> iter =
-          ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
-      LocatedFileStatus fileStatus = iter.next();
-      String[] location = fileStatus.getBlockLocations()[0].getHosts();
-      long len = fileStatus.getLen();
-      return new BlockMetaInfo(location, len);
-    } else {
-      return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
-    }
+    return blockletDataMapIndexWrapper;
   }
 
   @Override
-  public List<BlockletDataMap> getAll(
+  public List<BlockletDataMapIndexWrapper> getAll(
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
-    List<BlockletDataMap> blockletDataMaps =
+    List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
         new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
+    BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
     // Get the datamaps for each indexfile from cache.
     try {
       for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
-        BlockletDataMap ifPresent = getIfPresent(identifier);
-        if (ifPresent != null) {
-          blockletDataMaps.add(ifPresent);
+        BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
+        if (dataMapIndexWrapper != null) {
+          blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
         } else {
           missedIdentifiers.add(identifier);
         }
       }
       if (missedIdentifiers.size() > 0) {
-        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-        Set<String> filesRead = new HashSet<>();
-        for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
-          Map<String, BlockMetaInfo> blockMetaInfoMap =
-              getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
-          blockletDataMaps.add(
-              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
+        for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
+          blockletDataMapIndexWrapper = get(identifier);
+          blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
         }
       }
     } catch (Throwable e) {
-      for (BlockletDataMap dataMap : blockletDataMaps) {
-        dataMap.clear();
+      if (null != blockletDataMapIndexWrapper) {
+        List<BlockletDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
+        for (DataMap dataMap : dataMaps) {
+          dataMap.clear();
+        }
       }
       throw new IOException("Problem in loading segment blocks.", e);
     }
-    return blockletDataMaps;
+    return blockletDataMapIndexWrappers;
   }
 
   /**
@@ -185,9 +164,9 @@ public class BlockletDataMapIndexStore
    * @return
    */
   @Override
-  public BlockletDataMap getIfPresent(
+  public BlockletDataMapIndexWrapper getIfPresent(
       TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
-    return (BlockletDataMap) lruCache.get(
+    return (BlockletDataMapIndexWrapper) lruCache.get(
         tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
@@ -201,6 +180,44 @@ public class BlockletDataMapIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
+  @Override
+  public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
+    String uniqueTableSegmentIdentifier =
+        tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+    if (lock == null) {
+      lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+    }
+    long memorySize = 0L;
+    // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
+    // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
+    // in the cache need to be overwritten then use the invalidate interface
+    // and then use the put interface
+    if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+      synchronized (lock) {
+        if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+          List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+          try {
+            for (BlockletDataMap blockletDataMap: dataMaps) {
+              blockletDataMap.convertToUnsafeDMStore();
+              memorySize += blockletDataMap.getMemorySize();
+            }
+            lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
+                memorySize);
+          } catch (Throwable e) {
+            // clear all the memory acquired by data map in case of any failure
+            for (DataMap blockletDataMap : dataMaps) {
+              blockletDataMap.clear();
+            }
+            throw new IOException("Problem in adding datamap to cache.", e);
+          }
+        }
+      }
+    }
+  }
+
+
   /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded
@@ -228,8 +245,6 @@ public class BlockletDataMapIndexStore
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
           blockMetaInfoMap, identifier.getSegmentId()));
-      lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
-          dataMap.getMemorySize());
     }
     return dataMap;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
new file mode 100644
index 0000000..d674cb4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * A cacheable wrapper of datamaps
+ */
+public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
+
+  private List<BlockletDataMap> dataMaps;
+
+  public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
+    this.dataMaps = dataMaps;
+  }
+
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  @Override public int getAccessCount() {
+    return 0;
+  }
+
+  @Override public long getMemorySize() {
+    return 0;
+  }
+
+  public List<BlockletDataMap> getDataMaps() {
+    return dataMaps;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 660f4c1..8bae7fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -22,20 +22,29 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 import org.apache.hadoop.io.Writable;
-import org.xerial.snappy.Snappy;
 
 /**
  * Blocklet detail information to be sent to each executor
  */
 public class BlockletDetailInfo implements Serializable, Writable {
 
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDetailInfo.class.getName());
+
+  private static final long serialVersionUID = 7957493757421513808L;
+
   private int rowCount;
 
   private short pagesCount;
@@ -50,6 +59,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
 
   private BlockletInfo blockletInfo;
 
+  private byte[] blockletInfoBinary;
+
   private long blockFooterOffset;
 
   private List<ColumnSchema> columnSchemas;
@@ -83,6 +94,13 @@ public class BlockletDetailInfo implements Serializable, Writable {
   }
 
   public BlockletInfo getBlockletInfo() {
+    if (null == blockletInfo) {
+      try {
+        setBlockletInfoFromBinary();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
     return blockletInfo;
   }
 
@@ -90,6 +108,26 @@ public class BlockletDetailInfo implements Serializable, Writable {
     this.blockletInfo = blockletInfo;
   }
 
+  private void setBlockletInfoFromBinary() throws IOException {
+    if (null == this.blockletInfo && null != blockletInfoBinary && blockletInfoBinary.length > 0) {
+      blockletInfo = new BlockletInfo();
+      ByteArrayInputStream stream = new ByteArrayInputStream(blockletInfoBinary);
+      DataInputStream inputStream = new DataInputStream(stream);
+      try {
+        blockletInfo.readFields(inputStream);
+      } catch (IOException e) {
+        LOGGER.error("Problem in reading blocklet info");
+        throw new IOException("Problem in reading blocklet info." + e.getMessage());
+      } finally {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          LOGGER.error(e, "Problem in closing input stream of reading blocklet info.");
+        }
+      }
+    }
+  }
+
   public int[] getDimLens() {
     return dimLens;
   }
@@ -131,6 +169,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
     out.writeLong(blockFooterOffset);
     out.writeInt(columnSchemaBinary.length);
     out.write(columnSchemaBinary);
+    out.writeInt(blockletInfoBinary.length);
+    out.write(blockletInfoBinary);
     out.writeLong(blockSize);
   }
 
@@ -153,6 +193,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
     byte[] schemaArray = new byte[bytesSize];
     in.readFully(schemaArray);
     readColumnSchema(schemaArray);
+    int byteSize = in.readInt();
+    blockletInfoBinary = new byte[byteSize];
+    in.readFully(blockletInfoBinary);
+    setBlockletInfoFromBinary();
     blockSize = in.readLong();
   }
 
@@ -162,17 +206,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
    * @throws IOException
    */
   public void readColumnSchema(byte[] schemaArray) throws IOException {
-    // uncompress it.
-    schemaArray = Snappy.uncompress(schemaArray);
-    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
-    DataInput schemaInput = new DataInputStream(schemaStream);
-    columnSchemas = new ArrayList<>();
-    int size = schemaInput.readShort();
-    for (int i = 0; i < size; i++) {
-      ColumnSchema columnSchema = new ColumnSchema();
-      columnSchema.readFields(schemaInput);
-      columnSchemas.add(columnSchema);
-    }
+    BlockletDataMap blockletDataMap = new BlockletDataMap();
+    columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
   }
 
   /**
@@ -223,4 +258,9 @@ public class BlockletDetailInfo implements Serializable, Writable {
   public byte[] getColumnSchemaBinary() {
     return columnSchemaBinary;
   }
+
+  public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
+    this.blockletInfoBinary = blockletInfoBinary;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
new file mode 100644
index 0000000..d7a1b8f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to memory.
+ */
+public class SafeMemoryDMStore extends AbstractMemoryDMStore {
+
+  /**
+   * holds all blocklets metadata in memory
+   */
+  private List<DataMapRow> dataMapRows =
+      new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private int runningLength;
+
+  public SafeMemoryDMStore(CarbonRowSchema[] schema) {
+    super(schema);
+  }
+
+  /**
+   * Add the index row to dataMapRows, basically to in memory.
+   *
+   * @param indexRow
+   * @return
+   */
+  @Override
+  public void addIndexRow(DataMapRow indexRow) throws MemoryException {
+    dataMapRows.add(indexRow);
+    runningLength += indexRow.getTotalSizeInBytes();
+  }
+
+  @Override
+  public DataMapRow getDataMapRow(int index) {
+    assert (index < dataMapRows.size());
+    return dataMapRows.get(index);
+  }
+
+  @Override
+  public void freeMemory() {
+    if (!isMemoryFreed) {
+      if (null != dataMapRows) {
+        dataMapRows.clear();
+        dataMapRows = null;
+      }
+      isMemoryFreed = true;
+    }
+  }
+
+  @Override
+  public int getMemoryUsed() {
+    return runningLength;
+  }
+
+  @Override
+  public int getRowCount() {
+    return dataMapRows.size();
+  }
+
+  @Override
+  public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    setSchemaDataType();
+    UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
+    for (DataMapRow dataMapRow : dataMapRows) {
+      unsafeMemoryDMStore.addIndexRow(dataMapRow);
+    }
+    unsafeMemoryDMStore.finishWriting();
+    return unsafeMemoryDMStore;
+  }
+
+  /**
+   * Set the dataType to the schema. Needed in case of serialization / deserialization
+   */
+  private void setSchemaDataType() {
+    for (CarbonRowSchema carbonRowSchema : schema) {
+      carbonRowSchema.setDataType(DataTypeUtil.valueOf(carbonRowSchema.getDataType(), 0, 0));
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index c907fa8..3226ceb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.indexstore;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -24,7 +25,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 /**
  * Class holds the indexFile information to uniquely identitify the carbon index
  */
-public class TableBlockIndexUniqueIdentifier {
+public class TableBlockIndexUniqueIdentifier implements Serializable {
+
+  private static final long serialVersionUID = 5808112137916196344L;
 
   private String indexFilePath;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 31ecac2..ca5e2dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
 /**
  * Store the data map row @{@link DataMapRow} data to unsafe.
  */
-public class UnsafeMemoryDMStore {
+public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
-  private MemoryBlock memoryBlock;
+  private static final long serialVersionUID = -5344592407101055335L;
+
+  private transient MemoryBlock memoryBlock;
 
   private static int capacity = 8 * 1024;
 
@@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore {
 
   private int runningLength;
 
-  private boolean isMemoryFreed;
-
-  private CarbonRowSchema[] schema;
-
   private int[] pointers;
 
   private int rowCount;
 
-  private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
   public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
-    this.schema = schema;
+    super(schema);
     this.allocatedSize = capacity;
     this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
     this.pointers = new int[1000];
@@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore {
    * @param indexRow
    * @return
    */
-  public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
+  public void addIndexRow(DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
     int rowSize = indexRow.getTotalSizeInBytes();
     // Check whether allocated memory is sufficient or not.
@@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore {
     }
   }
 
-  public UnsafeDataMapRow getUnsafeRow(int index) {
+  public DataMapRow getDataMapRow(int index) {
     assert (index < rowCount);
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }
@@ -205,12 +200,8 @@ public class UnsafeMemoryDMStore {
     return runningLength;
   }
 
-  public CarbonRowSchema[] getSchema() {
-    return schema;
-  }
-
   public int getRowCount() {
     return rowCount;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index f72dc06..3ff9cdc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -18,10 +18,12 @@ package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -32,18 +34,19 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -76,11 +79,13 @@ import org.xerial.snappy.Snappy;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
+public class BlockletDataMap extends CoarseGrainDataMap implements Serializable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
 
+  private static final long serialVersionUID = -2170289352240810993L;
+
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;
@@ -119,14 +124,17 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
   private static int SEGMENTID = 5;
 
-  private UnsafeMemoryDMStore unsafeMemoryDMStore;
+  private AbstractMemoryDMStore memoryDMStore;
 
-  private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+  private AbstractMemoryDMStore summaryDMStore;
 
-  private SegmentProperties segmentProperties;
+  // As it is a heavy object it is not recommended to serialize this object
+  private transient SegmentProperties segmentProperties;
 
   private int[] columnCardinality;
 
+  private long blockletSchemaTime;
+
   @Override
   public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
@@ -150,11 +158,12 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       if (segmentProperties == null) {
         List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
         schemaBinary = convertSchemaToBinary(columnInTable);
+        blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp();
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        createSchema(segmentProperties);
+        createSchema(segmentProperties, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
         createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
-            segmentId);
+            segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
@@ -185,21 +194,23 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
         }
       }
     }
-    if (unsafeMemoryDMStore != null) {
-      unsafeMemoryDMStore.finishWriting();
+    if (memoryDMStore != null) {
+      memoryDMStore.finishWriting();
     }
-    if (null != unsafeMemorySummaryDMStore) {
+    if (null != summaryDMStore) {
       addTaskSummaryRowToUnsafeMemoryStore(
           summaryRow,
           schemaBinary,
           filePath,
           fileName,
           segmentId);
-      unsafeMemorySummaryDMStore.finishWriting();
+      summaryDMStore.finishWriting();
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
+              + (System.currentTimeMillis() - startTime));
     }
-    LOGGER.info(
-        "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
-            System.currentTimeMillis() - startTime));
   }
 
   private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
@@ -207,10 +218,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
-    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (!blockletList.isEmpty() && summaryRow == null) {
-      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
     }
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
@@ -226,7 +237,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
       // compute and set task level min values
       addTaskMinMaxValues(summaryRow, minMaxLen,
-          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+          summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
           TASK_MIN_VALUES_INDEX, true);
       ordinal++;
       taskMinMaxOrdinal++;
@@ -234,7 +245,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
       // compute and set task level max values
       addTaskMinMaxValues(summaryRow, minMaxLen,
-          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+          summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
           TASK_MAX_VALUES_INDEX, false);
       ordinal++;
 
@@ -269,7 +280,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
         row.setShort((short) relativeBlockletId++, ordinal++);
         // Store block size
         row.setLong(blockMetaInfo.getSize(), ordinal);
-        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+        memoryDMStore.addIndexRow(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -295,10 +306,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       BlockMetaInfo blockMetaInfo) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
-    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (summaryRow == null) {
-      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
     }
     DataMapRow row = new DataMapRowImpl(schema);
     int ordinal = 0;
@@ -317,14 +328,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
     // compute and set task level min values
     addTaskMinMaxValues(summaryRow, minMaxLen,
-        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
+        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
         TASK_MIN_VALUES_INDEX, true);
     ordinal++;
     taskMinMaxOrdinal++;
     row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
     // compute and set task level max values
     addTaskMinMaxValues(summaryRow, minMaxLen,
-        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
+        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
         TASK_MAX_VALUES_INDEX, false);
     ordinal++;
 
@@ -357,7 +368,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
       // store block size
       row.setLong(blockMetaInfo.getSize(), ordinal);
-      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      memoryDMStore.addIndexRow(row);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -378,7 +389,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
       summaryRow.setByteArray(segmentId, SEGMENTID);
       try {
-        unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+        summaryDMStore.addIndexRow(summaryRow);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -516,7 +527,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     taskMinMaxRow.setRow(row, ordinal);
   }
 
-  private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
+  private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
+      throws MemoryException {
     List<CarbonRowSchema> indexSchemas = new ArrayList<>();
 
     // Index key
@@ -553,8 +565,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     // for storing block length.
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
 
-    unsafeMemoryDMStore =
-        new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
+    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
   }
 
   /**
@@ -565,7 +577,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
    * @throws MemoryException
    */
   private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
-      byte[] filePath, byte[] fileName, byte[] segmentId)
+      byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
       throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
@@ -581,8 +593,9 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     // for storing segmentid
     taskMinMaxSchemas.add(
         new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
-    unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
-        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+    CarbonRowSchema[] schema =
+        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+    summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
   }
 
   private void getMinMaxSchema(SegmentProperties segmentProperties,
@@ -611,8 +624,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   public boolean isScanRequired(FilterResolverIntf filterExp) {
     FilterExecuter filterExecuter =
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-    for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
-      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+    for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
+      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
       boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
           filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
           getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
@@ -624,26 +637,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   }
 
   private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
-    if (unsafeMemoryDMStore.getRowCount() == 0) {
+    if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
     List<Blocklet> blocklets = new ArrayList<>();
     int numBlocklets = 0;
     if (filterExp == null) {
-      numBlocklets = unsafeMemoryDMStore.getRowCount();
+      numBlocklets = memoryDMStore.getRowCount();
       for (int i = 0; i < numBlocklets; i++) {
-        DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+        DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow();
         blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
       }
     } else {
       // Remove B-tree jump logic as start and end key prepared is not
       // correct for old store scenarios
       int startIndex = 0;
-      numBlocklets = unsafeMemoryDMStore.getRowCount();
+      numBlocklets = memoryDMStore.getRowCount();
       FilterExecuter filterExecuter =
           FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
       while (startIndex < numBlocklets) {
-        DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+        DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow();
         int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
         String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
             CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
@@ -663,7 +676,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions) {
-    if (unsafeMemoryDMStore.getRowCount() == 0) {
+    if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
     // if it has partitioned datamap but there is no partitioned information stored, it means
@@ -740,10 +753,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
   public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
     int index = Integer.parseInt(blockletId);
-    DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+    DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow();
     return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
   }
 
+  /**
+   * Get the index file name of the blocklet data map
+   *
+   * @return
+   */
+  public String getIndexFileName() {
+    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
+    try {
+      return new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
+          CarbonCommonConstants.DEFAULT_CHARSET);
+    } catch (UnsupportedEncodingException e) {
+      // should never happen!
+      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
+    }
+  }
+
   private byte[][] getMinMaxValue(DataMapRow row, int index) {
     DataMapRow minMaxRow = row.getRow(index);
     byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
@@ -764,23 +793,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     detailInfo.setBlockletId((short) blockletId);
     detailInfo.setDimLens(columnCardinality);
     detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
-    byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
-    BlockletInfo blockletInfo = null;
+    detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX));
     try {
-      if (byteArray.length > 0) {
-        blockletInfo = new BlockletInfo();
-        ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
-        DataInputStream inputStream = new DataInputStream(stream);
-        blockletInfo.readFields(inputStream);
-        inputStream.close();
-      }
       blocklet.setLocation(
           new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
               .split(","));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    detailInfo.setBlockletInfo(blockletInfo);
     blocklet.setDetailInfo(detailInfo);
     detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
     detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
@@ -791,7 +811,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   private String[] getFileDetails() {
     try {
       String[] fileDetails = new String[3];
-      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
       fileDetails[0] =
           new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
       fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
@@ -815,14 +835,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
     int childNodeIndex;
     int low = 0;
-    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int high = memoryDMStore.getRowCount() - 1;
     int mid = 0;
     int compareRes = -1;
     //
     while (low <= high) {
       mid = (low + high) >>> 1;
       // compare the entries
-      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
       if (compareRes < 0) {
         high = mid - 1;
       } else if (compareRes > 0) {
@@ -831,7 +851,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
         // if key is matched then get the first entry
         int currentPos = mid;
         while (currentPos - 1 >= 0
-            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) {
           currentPos--;
         }
         mid = currentPos;
@@ -863,14 +883,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
     int childNodeIndex;
     int low = 0;
-    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int high = memoryDMStore.getRowCount() - 1;
     int mid = 0;
     int compareRes = -1;
     //
     while (low <= high) {
       mid = (low + high) >>> 1;
       // compare the entries
-      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
       if (compareRes < 0) {
         high = mid - 1;
       } else if (compareRes > 0) {
@@ -878,8 +898,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       } else {
         int currentPos = mid;
         // if key is matched then get the first entry
-        while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
-            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+        while (currentPos + 1 < memoryDMStore.getRowCount()
+            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) {
           currentPos++;
         }
         mid = currentPos;
@@ -907,13 +927,13 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     buffer.putInt(key.getNoDictionaryKeys().length);
     buffer.put(key.getDictionaryKeys());
     buffer.put(key.getNoDictionaryKeys());
-    DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+    DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema());
     dataMapRow.setByteArray(buffer.array(), 0);
     return dataMapRow;
   }
 
   private byte[] getColumnSchemaBinary() {
-    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
     return unsafeRow.getByteArray(SCHEMA);
   }
 
@@ -937,36 +957,25 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
   @Override
   public void clear() {
-    if (unsafeMemoryDMStore != null) {
-      unsafeMemoryDMStore.freeMemory();
-      unsafeMemoryDMStore = null;
+    if (memoryDMStore != null) {
+      memoryDMStore.freeMemory();
+      memoryDMStore = null;
       segmentProperties = null;
     }
     // clear task min/max unsafe memory
-    if (null != unsafeMemorySummaryDMStore) {
-      unsafeMemorySummaryDMStore.freeMemory();
-      unsafeMemorySummaryDMStore = null;
+    if (null != summaryDMStore) {
+      summaryDMStore.freeMemory();
+      summaryDMStore = null;
     }
   }
 
-  @Override
-  public long getFileTimeStamp() {
-    return 0;
-  }
-
-  @Override
-  public int getAccessCount() {
-    return 0;
-  }
-
-  @Override
   public long getMemorySize() {
     long memoryUsed = 0L;
-    if (unsafeMemoryDMStore != null) {
-      memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+    if (memoryDMStore != null) {
+      memoryUsed += memoryDMStore.getMemoryUsed();
     }
-    if (null != unsafeMemorySummaryDMStore) {
-      memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+    if (null != summaryDMStore) {
+      memoryUsed += summaryDMStore.getMemoryUsed();
     }
     return memoryUsed;
   }
@@ -975,4 +984,65 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     return segmentProperties;
   }
 
+  public void setSegmentProperties(SegmentProperties segmentProperties) {
+    this.segmentProperties = segmentProperties;
+  }
+
+  public int[] getColumnCardinality() {
+    return columnCardinality;
+  }
+
+  private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
+      throws MemoryException {
+    AbstractMemoryDMStore memoryDMStore;
+    if (addToUnsafe) {
+      memoryDMStore = new UnsafeMemoryDMStore(schema);
+    } else {
+      memoryDMStore = new SafeMemoryDMStore(schema);
+    }
+    return memoryDMStore;
+  }
+
+  /**
+   * This method will ocnvert safe to unsafe memory DM store
+   *
+   * @throws MemoryException
+   */
+  public void convertToUnsafeDMStore() throws MemoryException {
+    if (memoryDMStore instanceof SafeMemoryDMStore) {
+      UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore();
+      memoryDMStore.freeMemory();
+      memoryDMStore = unsafeMemoryDMStore;
+    }
+    if (summaryDMStore instanceof SafeMemoryDMStore) {
+      UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore();
+      summaryDMStore.freeMemory();
+      summaryDMStore = unsafeSummaryMemoryDMStore;
+    }
+  }
+
+  /**
+   * Read column schema from binary
+   * @param schemaArray
+   * @throws IOException
+   */
+  public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
+    // uncompress it.
+    schemaArray = Snappy.uncompress(schemaArray);
+    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
+    DataInput schemaInput = new DataInputStream(schemaStream);
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    int size = schemaInput.readShort();
+    for (int i = 0; i < size; i++) {
+      ColumnSchema columnSchema = new ColumnSchema();
+      columnSchema.readFields(schemaInput);
+      columnSchemas.add(columnSchema);
+    }
+    return columnSchemas;
+  }
+
+  public long getBlockletSchemaTime() {
+    return blockletSchemaTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 99e48a5..7cdf77d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.indexstore.blockletindex;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 
 /**
  * This class contains required information to make the Blocklet datamap distributable.
@@ -31,6 +32,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
    */
   private String filePath;
 
+  private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
   public BlockletDataMapDistributable(String indexFilePath) {
     this.filePath = indexFilePath;
   }
@@ -38,4 +41,13 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
   public String getFilePath() {
     return filePath;
   }
+
+  public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
+    return tableBlockIndexUniqueIdentifier;
+  }
+
+  public void setTableBlockIndexUniqueIdentifier(
+      TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifiers) {
+    this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifiers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c0bc2a6..c3df721 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -38,15 +41,17 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -59,7 +64,7 @@ import org.apache.hadoop.fs.RemoteIterator;
  * Table map for blocklet
  */
 public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
-    implements BlockletDetailsFetcher, SegmentPropertiesFetcher {
+    implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableDataMap {
 
   private static final String NAME = "clustered.btree.blocklet";
 
@@ -69,9 +74,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
-  private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
-  private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
 
   public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
     super(carbonTable, dataMapSchema);
@@ -91,24 +96,27 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
-    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
+    Set<TableBlockIndexUniqueIdentifier> identifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
-    return cache.getAll(tableBlockIndexUniqueIdentifiers);
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        new ArrayList<>(identifiers.size());
+    tableBlockIndexUniqueIdentifiers.addAll(identifiers);
+    List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
+        cache.getAll(tableBlockIndexUniqueIdentifiers);
+    for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
+      dataMaps.addAll(wrapper.getDataMaps());
+    }
+    return dataMaps;
   }
 
-  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+  private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
       throws IOException {
-    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
-      tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      Map<String, String> indexFiles = segment.getCommittedIndexFile();
-      for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
-        Path indexFile = new Path(indexFileEntry.getKey());
-        tableBlockIndexUniqueIdentifiers.add(
-            new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
-                indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
-      }
+      tableBlockIndexUniqueIdentifiers =
+          BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
       segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
     }
     return tableBlockIndexUniqueIdentifiers;
@@ -130,7 +138,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       }
       return detailedBlocklets;
     }
-    List<TableBlockIndexUniqueIdentifier> identifiers =
+    Set<TableBlockIndexUniqueIdentifier> identifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
@@ -145,17 +153,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
-    List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+    Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
-  private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+  private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getIndexFileName().startsWith(blocklet.getFilePath())) {
-        DataMap dataMap = cache.get(identifier);
-        return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+      BlockletDataMapIndexWrapper wrapper = cache.get(identifier);
+      List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+      for (DataMap dataMap : dataMaps) {
+        if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) {
+          return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+        }
       }
     }
     throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
@@ -166,23 +176,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> distributables = new ArrayList<>();
     try {
-      CarbonFile[] carbonIndexFiles;
-      if (segment.getSegmentFileName() == null) {
-        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
-      } else {
-        SegmentFileStore fileStore =
-            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        Map<String, String> indexFiles = fileStore.getIndexFiles();
-        carbonIndexFiles = new CarbonFile[indexFiles.size()];
-        int i = 0;
-        for (String indexFile : indexFiles.keySet()) {
-          carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
-        }
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+      CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()];
+      int identifierCounter = 0;
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+          tableBlockIndexUniqueIdentifiers) {
+        String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath();
+        String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName();
+        carbonIndexFiles[identifierCounter++] = FileFactory
+            .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName);
       }
       for (int i = 0; i < carbonIndexFiles.length; i++) {
         Path path = new Path(carbonIndexFiles[i].getPath());
-
         FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
         RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
         LocatedFileStatus fileStatus = iter.next();
@@ -205,13 +211,18 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   @Override
   public void clear(Segment segment) {
-    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
-        DataMap dataMap = cache.getIfPresent(blockIndex);
-        if (dataMap != null) {
-          cache.invalidate(blockIndex);
-          dataMap.clear();
+        BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex);
+        if (null != wrapper) {
+          List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+          for (DataMap dataMap : dataMaps) {
+            if (dataMap != null) {
+              cache.invalidate(blockIndex);
+              dataMap.clear();
+            }
+          }
         }
       }
     }
@@ -246,9 +257,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
                 segmentNo));
       }
     }
-    List<CoarseGrainDataMap> dataMaps;
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
     try {
-      dataMaps = cache.getAll(identifiers);
+      List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiers);
+      for (BlockletDataMapIndexWrapper wrapper : wrappers) {
+        dataMaps.addAll(wrapper.getDataMaps());
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -289,4 +303,29 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return false;
   }
 
+  @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException {
+    cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper);
+  }
+
+  @Override
+  public List<DataMapDistributable> getAllUncachedDistributables(
+      List<DataMapDistributable> distributables) throws IOException {
+    List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size());
+    for (DataMapDistributable distributable : distributables) {
+      Segment segment = distributable.getSegment();
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+      // filter out the tableBlockIndexUniqueIdentifiers based on distributable
+      TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
+          .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
+              (BlockletDataMapDistributable) distributable);
+      if (null == cache.getIfPresent(validIdentifier)) {
+        ((BlockletDataMapDistributable) distributable)
+            .setTableBlockIndexUniqueIdentifier(validIdentifier);
+        distributablesToBeLoaded.add(distributable);
+      }
+    }
+    return distributablesToBeLoaded;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index ebeb278..7443d15 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private String segmentId;
 
+  private boolean addToUnsafe = true;
+
   public BlockletDataMapModel(String filePath, byte[] fileData,
       Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
     super(filePath);
@@ -40,6 +42,12 @@ public class BlockletDataMapModel extends DataMapModel {
     this.segmentId = segmentId;
   }
 
+  public BlockletDataMapModel(String filePath, byte[] fileData,
+      Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, boolean addToUnsafe) {
+    this(filePath, fileData, blockMetaInfoMap, segmentId);
+    this.addToUnsafe = addToUnsafe;
+  }
+
   public byte[] getFileData() {
     return fileData;
   }
@@ -51,4 +59,8 @@ public class BlockletDataMapModel extends DataMapModel {
   public String getSegmentId() {
     return segmentId;
   }
+
+  public boolean isAddToUnsafe() {
+    return addToUnsafe;
+  }
 }