You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/04/10 10:59:11 UTC

[2/2] carbondata git commit: [CARBONDATA-2310] Refactored code to improve Distributable interface

[CARBONDATA-2310] Refactored code to improve Distributable interface

Refactored code to improve Distributable interface

This closes #2134


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c48df39
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c48df39
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c48df39

Branch: refs/heads/branch-1.3
Commit: 3c48df396f2bafc9efc8091fc7abefca089922d7
Parents: 31c7b50
Author: dhatchayani <dh...@gmail.com>
Authored: Tue Apr 3 11:19:43 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Apr 10 16:16:27 2018 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/core/cache/Cache.java |  10 ++
 .../dictionary/AbstractDictionaryCache.java     |   4 +
 .../core/constants/CarbonCommonConstants.java   |   3 +
 .../core/datamap/dev/CacheableDataMap.java      |  47 ++++++
 .../carbondata/core/datamap/dev/DataMap.java    |   3 +-
 .../core/datastore/BlockIndexStore.java         |   4 +
 .../core/datastore/SegmentTaskIndexStore.java   |   4 +
 .../core/indexstore/AbstractMemoryDMStore.java  |  63 +++++++
 .../indexstore/BlockletDataMapIndexStore.java   |  92 ++++------
 .../core/indexstore/SafeMemoryDMStore.java      |  94 +++++++++++
 .../TableBlockIndexUniqueIdentifier.java        |   3 +-
 .../core/indexstore/UnsafeMemoryDMStore.java    |  23 +--
 .../blockletindex/BlockletDataMap.java          | 169 +++++++++++++------
 .../BlockletDataMapDistributable.java           |  18 +-
 .../blockletindex/BlockletDataMapFactory.java   |  95 +++++++----
 .../blockletindex/BlockletDataMapModel.java     |  13 ++
 .../core/indexstore/row/DataMapRow.java         |  13 +-
 .../core/indexstore/row/UnsafeDataMapRow.java   |   7 +-
 .../core/indexstore/schema/CarbonRowSchema.java |   4 +-
 .../core/util/BlockletDataMapUtil.java          | 140 +++++++++++++++
 .../carbondata/core/util/SessionParams.java     |   5 +
 .../TestBlockletDataMapFactory.java             | 108 ++++++++++++
 .../apache/carbondata/hadoop/CacheClient.java   |  43 +++++
 .../hadoop/api/AbstractDataMapJob.java          |  43 +++++
 .../hadoop/api/CarbonTableInputFormat.java      |  61 ++++++-
 .../carbondata/hadoop/api/DataMapJob.java       |   6 +
 .../hadoop/util/CarbonInputFormatUtil.java      |  44 +++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   9 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   4 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |  13 ++
 .../execution/command/CarbonHiveCommands.scala  |   9 +
 31 files changed, 973 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
index 04fa18a..6df36fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.carbondata.core.memory.MemoryException;
+
 /**
  * A semi-persistent mapping from keys to values. Cache entries are manually added using
  * #get(Key), #getAll(List<Keys>) , and are stored in the cache until
@@ -69,6 +71,14 @@ public interface Cache<K, V> {
   void invalidate(K key);
 
   /**
+   * This method will add the value to the cache for the given key
+   *
+   * @param key
+   * @param value
+   */
+  void put(K key, V value) throws IOException, MemoryException;
+
+  /**
    * Access count of Cacheable entry will be decremented
    *
    * @param keys

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 598d00e..9ed9007 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,6 +59,10 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
     initThreadPoolSize();
   }
 
+  @Override public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * This method will initialize the thread pool size to be used for creating the
    * max number of threads for a job

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 605d711..7bdea4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1587,6 +1587,9 @@ public final class CarbonCommonConstants {
   // default value is 2 days
   public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT = "48";
 
+  // Property to enable parallel datamap loading for a table
+  public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel.";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
new file mode 100644
index 0000000..885b21f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Interface for data map caching
+ */
+public interface CacheableDataMap {
+
+  /**
+   * Add the dataMap to cache
+   *
+   * @param dataMap
+   */
+  void cache(DataMap dataMap) throws IOException, MemoryException;
+
+  /**
+   * Get all the uncached distributables from the list.
+   *
+   * @param distributables
+   * @return
+   */
+  List<DataMapDistributable> getAllUncachedDistributables(List<DataMapDistributable> distributables)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index f3642d6..f096dd7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.carbondata.core.indexstore.Blocklet;
@@ -27,7 +28,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 /**
  * Datamap is an entity which can store and retrieve index data.
  */
-public interface DataMap {
+public interface DataMap extends Serializable {
 
   /**
    * It is called to load the data map to memory or to initialize it.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
index f2c38fa..609bf1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
@@ -229,6 +229,10 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
         .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
   }
 
+  @Override public void put(TableBlockUniqueIdentifier key, AbstractIndex value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   @Override public void clearAccessCount(List<TableBlockUniqueIdentifier> keys) {
     for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) {
       SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 8ed5c18..744cc93 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -140,6 +140,10 @@ public class SegmentTaskIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
+  @Override public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * returns block timestamp value from the given task
    * @param taskKey

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
new file mode 100644
index 0000000..41b929a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+
+/**
+ * Store the data map row @{@link DataMapRow}
+ */
+public abstract class AbstractMemoryDMStore implements Serializable {
+
+  protected boolean isMemoryFreed;
+
+  protected CarbonRowSchema[] schema;
+
+  protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+  public AbstractMemoryDMStore(CarbonRowSchema[] schema) {
+    this.schema = schema;
+  }
+
+  public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException;
+
+  public abstract DataMapRow getDataMapRow(int index);
+
+  public abstract void freeMemory();
+
+  public abstract int getMemoryUsed();
+
+  public CarbonRowSchema[] getSchema() {
+    return schema;
+  }
+
+  public abstract int getRowCount();
+
+  public void finishWriting() throws MemoryException {
+    // do nothing in default implementation
+  }
+
+  public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    throw new UnsupportedOperationException("Operation not allowed");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ac14105..ce0fe8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,19 +30,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
@@ -86,7 +77,7 @@ public class BlockletDataMapIndexStore
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
         Set<String> filesRead = new HashSet<>();
         Map<String, BlockMetaInfo> blockMetaInfoMap =
-            getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
+            BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
         dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
       } catch (MemoryException e) {
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
@@ -96,54 +87,6 @@ public class BlockletDataMapIndexStore
     return dataMap;
   }
 
-  private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
-      SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
-    if (identifier.getMergeIndexFileName() != null) {
-      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
-          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getMergeIndexFileName());
-      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
-        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
-        filesRead.add(indexMergeFile.getPath());
-      }
-    }
-    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
-      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
-          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getIndexFileName()) });
-    }
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
-    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
-        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
-    for (DataFileFooter footer : indexInfo) {
-      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (FileFactory.isFileExist(blockPath)) {
-        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
-      } else {
-        LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName()
-            + " The block does not exist. The block might be got deleted due to clean up post"
-            + " update/delete operation over table.");
-      }
-    }
-    return blockMetaInfoMap;
-  }
-
-  private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
-    if (carbonFile instanceof AbstractDFSCarbonFile) {
-      RemoteIterator<LocatedFileStatus> iter =
-          ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
-      LocatedFileStatus fileStatus = iter.next();
-      String[] location = fileStatus.getBlockLocations()[0].getHosts();
-      long len = fileStatus.getLen();
-      return new BlockMetaInfo(location, len);
-    } else {
-      return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
-    }
-  }
-
   @Override
   public List<BlockletDataMap> getAll(
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
@@ -165,7 +108,7 @@ public class BlockletDataMapIndexStore
         Set<String> filesRead = new HashSet<>();
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
           Map<String, BlockMetaInfo> blockMetaInfoMap =
-              getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
+              BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
           blockletDataMaps.add(
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
         }
@@ -206,6 +149,35 @@ public class BlockletDataMapIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
+  @Override public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMap blockletDataMap) throws IOException, MemoryException {
+    String uniqueTableSegmentIdentifier =
+        tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+    if (lock == null) {
+      lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+    }
+    // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
+    // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
+    // in the cache need to be overwritten then use the invalidate interface
+    // and then use the put interface
+    if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+      synchronized (lock) {
+        if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+          try {
+            blockletDataMap.convertToUnsafeDMStore();
+            lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(),
+                blockletDataMap, blockletDataMap.getMemorySize());
+          } catch (Throwable e) {
+            // clear all the memory acquired by data map in case of any failure
+            blockletDataMap.clear();
+            throw new IOException("Problem in adding datamap to cache.", e);
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
new file mode 100644
index 0000000..d51f28d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to memory.
+ */
+public class SafeMemoryDMStore extends AbstractMemoryDMStore {
+
+  /**
+   * holds all blocklets metadata in memory
+   */
+  private List<DataMapRow> dataMapRows =
+      new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private int runningLength;
+
+  public SafeMemoryDMStore(CarbonRowSchema[] schema) {
+    super(schema);
+  }
+
+  /**
+   * Add the index row to dataMapRows, basically to in memory.
+   *
+   * @param indexRow
+   * @return
+   */
+  @Override
+  public void addIndexRow(DataMapRow indexRow) throws MemoryException {
+    dataMapRows.add(indexRow);
+    runningLength += indexRow.getTotalSizeInBytes();
+  }
+
+  @Override
+  public DataMapRow getDataMapRow(int index) {
+    assert (index < dataMapRows.size());
+    return dataMapRows.get(index);
+  }
+
+  @Override
+  public void freeMemory() {
+    if (!isMemoryFreed) {
+      if (null != dataMapRows) {
+        dataMapRows.clear();
+        dataMapRows = null;
+      }
+      isMemoryFreed = true;
+    }
+  }
+
+  @Override
+  public int getMemoryUsed() {
+    return runningLength;
+  }
+
+  @Override
+  public int getRowCount() {
+    return dataMapRows.size();
+  }
+
+  @Override
+  public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
+    for (DataMapRow dataMapRow : dataMapRows) {
+      unsafeMemoryDMStore.addIndexRow(dataMapRow);
+    }
+    unsafeMemoryDMStore.finishWriting();
+    return unsafeMemoryDMStore;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index c907fa8..8118fe4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.indexstore;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -24,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 /**
  * Class holds the indexFile information to uniquely identitify the carbon index
  */
-public class TableBlockIndexUniqueIdentifier {
+public class TableBlockIndexUniqueIdentifier implements Serializable {
 
   private String indexFilePath;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 31ecac2..6fe7fd2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
 /**
  * Store the data map row @{@link DataMapRow} data to unsafe.
  */
-public class UnsafeMemoryDMStore {
+public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
-  private MemoryBlock memoryBlock;
+  private static final long serialVersionUID = -5344592407101055335L;
+
+  private transient MemoryBlock memoryBlock;
 
   private static int capacity = 8 * 1024;
 
@@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore {
 
   private int runningLength;
 
-  private boolean isMemoryFreed;
-
-  private CarbonRowSchema[] schema;
-
   private int[] pointers;
 
   private int rowCount;
 
-  private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
   public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
-    this.schema = schema;
+    super(schema);
     this.allocatedSize = capacity;
     this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
     this.pointers = new int[1000];
@@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore {
    * @param indexRow
    * @return
    */
-  public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
+  public void addIndexRow(DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
     int rowSize = indexRow.getTotalSizeInBytes();
     // Check whether allocated memory is sufficient or not.
@@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore {
     }
   }
 
-  public UnsafeDataMapRow getUnsafeRow(int index) {
+  public DataMapRow getDataMapRow(int index) {
     assert (index < rowCount);
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }
@@ -205,10 +200,6 @@ public class UnsafeMemoryDMStore {
     return runningLength;
   }
 
-  public CarbonRowSchema[] getSchema() {
-    return schema;
-  }
-
   public int getRowCount() {
     return rowCount;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 9ec7a46..66fa0aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -39,11 +39,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -82,6 +85,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   public static final String NAME = "clustered.btree.blocklet";
 
+  private static final long serialVersionUID = 4121938766748899140L;
+
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;
@@ -120,14 +125,17 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private static int SEGMENTID = 5;
 
-  private UnsafeMemoryDMStore unsafeMemoryDMStore;
+  private AbstractMemoryDMStore memoryDMStore;
 
-  private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+  private AbstractMemoryDMStore summaryDMStore;
 
-  private SegmentProperties segmentProperties;
+  // As it is a heavy object it is not recommended to serialize this object
+  private transient SegmentProperties segmentProperties;
 
   private int[] columnCardinality;
 
+  private TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier;
+
   @Override
   public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
@@ -153,9 +161,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
         schemaBinary = convertSchemaToBinary(columnInTable);
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        createSchema(segmentProperties);
+        createSchema(segmentProperties,
+            ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
         createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
-            segmentId);
+            segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
@@ -186,20 +195,20 @@ public class BlockletDataMap implements DataMap, Cacheable {
         }
       }
     }
-    if (unsafeMemoryDMStore != null) {
-      unsafeMemoryDMStore.finishWriting();
+    if (memoryDMStore != null) {
+      memoryDMStore.finishWriting();
     }
-    if (null != unsafeMemorySummaryDMStore) {
+    if (null != summaryDMStore) {
       addTaskSummaryRowToUnsafeMemoryStore(
           summaryRow,
           schemaBinary,
           filePath,
           fileName,
           segmentId);
-      unsafeMemorySummaryDMStore.finishWriting();
+      summaryDMStore.finishWriting();
     }
     LOGGER.info(
-        "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
+        "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is " + (
             System.currentTimeMillis() - startTime));
   }
 
@@ -208,10 +217,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
       BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
-    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (!blockletList.isEmpty() && summaryRow == null) {
-      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
     }
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
@@ -227,7 +236,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
       // compute and set task level min values
       addTaskMinMaxValues(summaryRow, minMaxLen,
-          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+          summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
           TASK_MIN_VALUES_INDEX, true);
       ordinal++;
       taskMinMaxOrdinal++;
@@ -235,7 +244,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
       // compute and set task level max values
       addTaskMinMaxValues(summaryRow, minMaxLen,
-          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+          summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
           TASK_MAX_VALUES_INDEX, false);
       ordinal++;
 
@@ -270,7 +279,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
         row.setShort((short) relativeBlockletId++, ordinal++);
         // Store block size
         row.setLong(blockMetaInfo.getSize(), ordinal);
-        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+        memoryDMStore.addIndexRow(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -296,10 +305,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
       BlockMetaInfo blockMetaInfo) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
-    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (summaryRow == null) {
-      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
     }
     DataMapRow row = new DataMapRowImpl(schema);
     int ordinal = 0;
@@ -318,14 +327,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
     row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
     // compute and set task level min values
     addTaskMinMaxValues(summaryRow, minMaxLen,
-        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
+        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
         TASK_MIN_VALUES_INDEX, true);
     ordinal++;
     taskMinMaxOrdinal++;
     row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
     // compute and set task level max values
     addTaskMinMaxValues(summaryRow, minMaxLen,
-        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
+        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
         TASK_MAX_VALUES_INDEX, false);
     ordinal++;
 
@@ -358,7 +367,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
       // store block size
       row.setLong(blockMetaInfo.getSize(), ordinal);
-      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      memoryDMStore.addIndexRow(row);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -379,7 +388,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
       summaryRow.setByteArray(segmentId, SEGMENTID);
       try {
-        unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+        summaryDMStore.addIndexRow(summaryRow);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -517,7 +526,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     taskMinMaxRow.setRow(row, ordinal);
   }
 
-  private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
+  private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
+      throws MemoryException {
     List<CarbonRowSchema> indexSchemas = new ArrayList<>();
 
     // Index key
@@ -554,8 +564,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     // for storing block length.
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
 
-    unsafeMemoryDMStore =
-        new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
+    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
   }
 
   /**
@@ -566,7 +576,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
    * @throws MemoryException
    */
   private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
-      byte[] filePath, byte[] fileName, byte[] segmentId)
+      byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
       throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
@@ -582,8 +592,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     // for storing segmentid
     taskMinMaxSchemas.add(
         new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
-    unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
-        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+    CarbonRowSchema[] schema =
+        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+    summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
   }
 
   private void getMinMaxSchema(SegmentProperties segmentProperties,
@@ -612,8 +623,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
   public boolean isScanRequired(FilterResolverIntf filterExp) {
     FilterExecuter filterExecuter =
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-    for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
-      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+    for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
+      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
       boolean isScanRequired = FilterExpressionProcessor
           .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
               getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
@@ -626,25 +637,25 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp) {
-    if (unsafeMemoryDMStore.getRowCount() == 0) {
+    if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
     List<Blocklet> blocklets = new ArrayList<>();
     if (filterExp == null) {
-      int rowCount = unsafeMemoryDMStore.getRowCount();
+      int rowCount = memoryDMStore.getRowCount();
       for (int i = 0; i < rowCount; i++) {
-        DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+        DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow();
         blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
       }
     } else {
       // Remove B-tree jump logic as start and end key prepared is not
       // correct for old store scenarios
       int startIndex = 0;
-      int endIndex = unsafeMemoryDMStore.getRowCount();
+      int endIndex = memoryDMStore.getRowCount();
       FilterExecuter filterExecuter =
           FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
       while (startIndex < endIndex) {
-        DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+        DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow();
         int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
         String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
             CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
@@ -663,7 +674,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
-    if (unsafeMemoryDMStore.getRowCount() == 0) {
+    if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
     // if it has partitioned datamap but there is no partitioned information stored, it means
@@ -736,7 +747,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
     int index = Integer.parseInt(blockletId);
-    DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+    DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow();
     return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
   }
 
@@ -787,7 +798,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
   private String[] getFileDetails() {
     try {
       String[] fileDetails = new String[3];
-      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
       fileDetails[0] =
           new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
       fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
@@ -811,14 +822,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
   private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
     int childNodeIndex;
     int low = 0;
-    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int high = memoryDMStore.getRowCount() - 1;
     int mid = 0;
     int compareRes = -1;
     //
     while (low <= high) {
       mid = (low + high) >>> 1;
       // compare the entries
-      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
       if (compareRes < 0) {
         high = mid - 1;
       } else if (compareRes > 0) {
@@ -827,7 +838,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
         // if key is matched then get the first entry
         int currentPos = mid;
         while (currentPos - 1 >= 0
-            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) {
           currentPos--;
         }
         mid = currentPos;
@@ -859,14 +870,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
   private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
     int childNodeIndex;
     int low = 0;
-    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int high = memoryDMStore.getRowCount() - 1;
     int mid = 0;
     int compareRes = -1;
     //
     while (low <= high) {
       mid = (low + high) >>> 1;
       // compare the entries
-      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
       if (compareRes < 0) {
         high = mid - 1;
       } else if (compareRes > 0) {
@@ -874,8 +885,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
       } else {
         int currentPos = mid;
         // if key is matched then get the first entry
-        while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
-            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+        while (currentPos + 1 < memoryDMStore.getRowCount()
+            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) {
           currentPos++;
         }
         mid = currentPos;
@@ -903,13 +914,13 @@ public class BlockletDataMap implements DataMap, Cacheable {
     buffer.putInt(key.getNoDictionaryKeys().length);
     buffer.put(key.getDictionaryKeys());
     buffer.put(key.getNoDictionaryKeys());
-    DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+    DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema());
     dataMapRow.setByteArray(buffer.array(), 0);
     return dataMapRow;
   }
 
   private byte[] getColumnSchemaBinary() {
-    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
     return unsafeRow.getByteArray(SCHEMA);
   }
 
@@ -933,15 +944,15 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public void clear() {
-    if (unsafeMemoryDMStore != null) {
-      unsafeMemoryDMStore.freeMemory();
-      unsafeMemoryDMStore = null;
+    if (memoryDMStore != null) {
+      memoryDMStore.freeMemory();
+      memoryDMStore = null;
       segmentProperties = null;
     }
     // clear task min/max unsafe memory
-    if (null != unsafeMemorySummaryDMStore) {
-      unsafeMemorySummaryDMStore.freeMemory();
-      unsafeMemorySummaryDMStore = null;
+    if (null != summaryDMStore) {
+      summaryDMStore.freeMemory();
+      summaryDMStore = null;
     }
   }
 
@@ -958,13 +969,59 @@ public class BlockletDataMap implements DataMap, Cacheable {
   @Override
   public long getMemorySize() {
     long memoryUsed = 0L;
-    if (unsafeMemoryDMStore != null) {
-      memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+    if (memoryDMStore != null) {
+      memoryUsed += memoryDMStore.getMemoryUsed();
     }
-    if (null != unsafeMemorySummaryDMStore) {
-      memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+    if (null != summaryDMStore) {
+      memoryUsed += summaryDMStore.getMemoryUsed();
     }
     return memoryUsed;
   }
 
+  public TableBlockIndexUniqueIdentifier getTableBlockUniqueIdentifier() {
+    return tableBlockUniqueIdentifier;
+  }
+
+  public void setTableBlockUniqueIdentifier(
+      TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier) {
+    this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
+  }
+
+  public void setSegmentProperties(SegmentProperties segmentProperties) {
+    this.segmentProperties = segmentProperties;
+  }
+
+  public int[] getColumnCardinality() {
+    return columnCardinality;
+  }
+
+  private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
+      throws MemoryException {
+    AbstractMemoryDMStore memoryDMStore;
+    if (addToUnsafe) {
+      memoryDMStore = new UnsafeMemoryDMStore(schema);
+    } else {
+      memoryDMStore = new SafeMemoryDMStore(schema);
+    }
+    return memoryDMStore;
+  }
+
+  /**
+   * This method will ocnvert safe to unsafe memory DM store
+   *
+   * @throws MemoryException
+   */
+  public void convertToUnsafeDMStore() throws MemoryException {
+    if (memoryDMStore instanceof SafeMemoryDMStore) {
+      UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore();
+      memoryDMStore.freeMemory();
+      memoryDMStore = unsafeMemoryDMStore;
+    }
+    if (summaryDMStore instanceof SafeMemoryDMStore) {
+      UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore();
+      summaryDMStore.freeMemory();
+      summaryDMStore = unsafeSummaryMemoryDMStore;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 99e48a5..02ac8d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -16,7 +16,10 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
+import java.util.Set;
+
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 
 /**
  * This class contains required information to make the Blocklet datamap distributable.
@@ -31,11 +34,24 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
    */
   private String filePath;
 
-  public BlockletDataMapDistributable(String indexFilePath) {
+  private Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers;
+
+  public BlockletDataMapDistributable(String indexFilePath,
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
     this.filePath = indexFilePath;
+    this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
   }
 
   public String getFilePath() {
     return filePath;
   }
+
+  public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers() {
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  public void setTableBlockIndexUniqueIdentifiers(
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
+    this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 5eb077f..c08c87e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -37,8 +40,10 @@ import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -50,12 +55,13 @@ import org.apache.hadoop.fs.RemoteIterator;
 /**
  * Table map for blocklet
  */
-public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher {
+public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher,
+    CacheableDataMap {
 
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
-  private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
   private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
 
@@ -73,33 +79,47 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   @Override
   public List<DataMap> getDataMaps(Segment segment) throws IOException {
+    Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        new ArrayList<>(identifiers.size());
+    tableBlockIndexUniqueIdentifiers.addAll(identifiers);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
-  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+  @Override public void cache(DataMap dataMap) throws IOException, MemoryException {
+    BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
+    cache.put(blockletDataMap.getTableBlockUniqueIdentifier(), blockletDataMap);
+  }
+
+  @Override
+  public List<DataMapDistributable> getAllUncachedDistributables(
+      List<DataMapDistributable> distributables) throws IOException {
+    List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size());
+    for (DataMapDistributable distributable : distributables) {
+      Segment segment = distributable.getSegment();
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+      // filter out the tableBlockIndexUniqueIdentifiers based on distributable
+      Set<TableBlockIndexUniqueIdentifier> validIdentifiers = BlockletDataMapUtil
+          .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
+              (BlockletDataMapDistributable) distributable);
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : validIdentifiers) {
+        if (null == cache.getIfPresent(tableBlockIndexUniqueIdentifier)) {
+          distributablesToBeLoaded.add(distributable);
+          break;
+        }
+      }
+    }
+    return distributablesToBeLoaded;
+  }
+
+  private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
       Segment segment) throws IOException {
-    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
-      tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      Map<String, String> indexFiles;
-      if (segment.getSegmentFileName() == null) {
-        String path =
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
-        indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
-      } else {
-        SegmentFileStore fileStore =
-            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        indexFiles = fileStore.getIndexFiles();
-      }
-      for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
-        Path indexFile = new Path(indexFileEntry.getKey());
-        tableBlockIndexUniqueIdentifiers.add(
-            new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
-                indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
-      }
+      tableBlockIndexUniqueIdentifiers =
+          BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment, identifier.getTablePath());
       segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
     }
     return tableBlockIndexUniqueIdentifiers;
@@ -121,7 +141,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
       }
       return detailedBlocklets;
     }
-    List<TableBlockIndexUniqueIdentifier> identifiers =
+    Set<TableBlockIndexUniqueIdentifier> identifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
@@ -136,12 +156,12 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
-    List<TableBlockIndexUniqueIdentifier> identifiers =
+    Set<TableBlockIndexUniqueIdentifier> identifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
-  private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+  private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
     String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
@@ -156,6 +176,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   @Override
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> distributables = new ArrayList<>();
+    Map<String, String> indexFiles = null;
     try {
       CarbonFile[] carbonIndexFiles;
       if (segment.getSegmentFileName() == null) {
@@ -164,11 +185,20 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
       } else {
         SegmentFileStore fileStore =
             new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        Map<String, String> indexFiles = fileStore.getIndexFiles();
+        indexFiles = fileStore.getIndexFiles();
         carbonIndexFiles = new CarbonFile[indexFiles.size()];
         int i = 0;
-        for (String indexFile : indexFiles.keySet()) {
-          carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+        for (Map.Entry<String, String> entry : indexFiles.entrySet()) {
+          String indexFile = entry.getKey();
+          String mergeFileName = entry.getValue();
+          if (null != mergeFileName) {
+            String mergeIndexPath = indexFile
+                .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1)
+                + mergeFileName;
+            carbonIndexFiles[i++] = FileFactory.getCarbonFile(mergeIndexPath);
+          } else {
+            carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+          }
         }
       }
       for (int i = 0; i < carbonIndexFiles.length; i++) {
@@ -179,7 +209,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
         LocatedFileStatus fileStatus = iter.next();
         String[] location = fileStatus.getBlockLocations()[0].getHosts();
         BlockletDataMapDistributable distributable =
-            new BlockletDataMapDistributable(path.toString());
+            new BlockletDataMapDistributable(path.toString(), null);
         distributable.setLocations(location);
         distributables.add(distributable);
 
@@ -197,7 +227,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
 
   @Override
   public void clear(Segment segment) {
-    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
         DataMap dataMap = cache.getIfPresent(blockIndex);
@@ -251,4 +281,9 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
     // TODO: pass SORT_COLUMNS into this class
     return null;
   }
+
+  public Map<String, Set<TableBlockIndexUniqueIdentifier>> getSegmentMap() {
+    return segmentMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index ebeb278..17b2463 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private String segmentId;
 
+  private boolean addToUnsafe = true;
+
   public BlockletDataMapModel(String filePath, byte[] fileData,
       Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
     super(filePath);
@@ -40,6 +42,13 @@ public class BlockletDataMapModel extends DataMapModel {
     this.segmentId = segmentId;
   }
 
+  public BlockletDataMapModel(String filePath, byte[] fileData,
+      Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, boolean addToUnsafe) {
+    this(filePath, fileData, blockMetaInfoMap, segmentId);
+    this.addToUnsafe = addToUnsafe;
+  }
+
+
   public byte[] getFileData() {
     return fileData;
   }
@@ -51,4 +60,8 @@ public class BlockletDataMapModel extends DataMapModel {
   public String getSegmentId() {
     return segmentId;
   }
+
+  public boolean isAddToUnsafe() {
+    return addToUnsafe;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index b764bdf..496a1d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,13 +16,15 @@
  */
 package org.apache.carbondata.core.indexstore.row;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 
 /**
  * It is just a normal row to store data. Implementation classes could be safe and unsafe.
  * TODO move this class a global row and use across loading after DataType is changed class
  */
-public abstract class DataMapRow {
+public abstract class DataMapRow implements Serializable {
 
   protected CarbonRowSchema[] schemas;
 
@@ -88,4 +90,13 @@ public abstract class DataMapRow {
   public int getColumnCount() {
     return schemas.length;
   }
+
+  /**
+   * default implementation
+   *
+   * @return
+   */
+  public DataMapRow convertToSafeRow() {
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 1b95984..323fb24 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -30,7 +30,12 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
  */
 public class UnsafeDataMapRow extends DataMapRow {
 
-  private MemoryBlock block;
+  private static final long serialVersionUID = -8649767299407770884L;
+
+  // As it is an unsafe memory block it is not recommended to serialize.
+  // If at all required to be serialized then override writeObject methods
+  // to which should take care of clearing the unsafe memory post serialization
+  private transient MemoryBlock block;
 
   private int pointer;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 813be4a..adb8715 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -16,12 +16,14 @@
  */
 package org.apache.carbondata.core.indexstore.schema;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * It just have 2 types right now, either fixed or variable.
  */
-public abstract class CarbonRowSchema {
+public abstract class CarbonRowSchema implements Serializable {
 
   protected DataType dataType;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
new file mode 100644
index 0000000..766650d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+
+public class BlockletDataMapUtil {
+
+  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
+      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
+      Set<String> filesRead) throws IOException {
+    if (identifier.getMergeIndexFileName() != null) {
+      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getMergeIndexFileName());
+      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
+        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+        filesRead.add(indexMergeFile.getPath());
+      }
+    }
+    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getIndexFileName()) });
+    }
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+    for (DataFileFooter footer : indexInfo) {
+      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
+      blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
+    }
+    return blockMetaInfoMap;
+  }
+
+  private static BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
+    if (carbonFile instanceof AbstractDFSCarbonFile) {
+      RemoteIterator<LocatedFileStatus> iter =
+          ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
+      LocatedFileStatus fileStatus = iter.next();
+      String[] location = fileStatus.getBlockLocations()[0].getHosts();
+      long len = fileStatus.getLen();
+      return new BlockMetaInfo(location, len);
+    } else {
+      return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
+    }
+  }
+
+  public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment,
+      String tablePath) throws IOException {
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>();
+    Map<String, String> indexFiles;
+    if (segment.getSegmentFileName() == null) {
+      String path = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
+      indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+    } else {
+      SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+      indexFiles = fileStore.getIndexFiles();
+    }
+    for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
+      Path indexFile = new Path(indexFileEntry.getKey());
+      tableBlockIndexUniqueIdentifiers.add(
+          new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(),
+              indexFileEntry.getValue(), segment.getSegmentNo()));
+    }
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  /**
+   * This method will filter out the TableBlockIndexUniqueIdentifiers belongs to that distributable
+   *
+   * @param tableBlockIndexUniqueIdentifiers
+   * @param distributable
+   * @return
+   */
+  public static Set<TableBlockIndexUniqueIdentifier> filterIdentifiersBasedOnDistributable(
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+      BlockletDataMapDistributable distributable) {
+    Set<TableBlockIndexUniqueIdentifier> validIdentifiers =
+        new HashSet<>(tableBlockIndexUniqueIdentifiers.size());
+    if (distributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+          tableBlockIndexUniqueIdentifiers) {
+        if (null != tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
+          validIdentifiers.add(tableBlockIndexUniqueIdentifier);
+        }
+      }
+    } else {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+          tableBlockIndexUniqueIdentifiers) {
+        if (null == tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
+          validIdentifiers.add(tableBlockIndexUniqueIdentifier);
+        }
+      }
+    }
+    return validIdentifiers;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index c232b1e..08b5929 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -203,6 +203,11 @@ public class SessionParams implements Serializable {
           isValid = true;
         } else if (key.equalsIgnoreCase(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)) {
           isValid = true;
+        } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+          isValid = CarbonUtil.validateBoolean(value);
+          if (!isValid) {
+            throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
+          }
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
new file mode 100644
index 0000000..67c50e5
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletDataMapFactory {
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  private BlockletDataMapFactory blockletDataMapFactory;
+
+  private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
+  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+  @Before public void setUp() {
+    blockletDataMapFactory = new BlockletDataMapFactory();
+    blockletDataMapFactory.init(absoluteTableIdentifier, "dataMapName");
+    tableBlockIndexUniqueIdentifier =
+        new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+            "0_batchno0-0-1521012756709.carbonindex", null, "0");
+    cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+  }
+
+  @Test public void addDataMapToCache()
+      throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
+      IllegalAccessException {
+    BlockletDataMap dataMap = new BlockletDataMap();
+    dataMap.setTableBlockUniqueIdentifier(tableBlockIndexUniqueIdentifier);
+    Method method =
+        BlockletDataMapFactory.class.getDeclaredMethod("cache", DataMap.class);
+    method.setAccessible(true);
+    method.invoke(blockletDataMapFactory, dataMap);
+    DataMap result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+    assert null != result;
+  }
+
+  @Test public void getValidDistributables() throws IOException {
+    BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
+        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex", null);
+    Segment segment = new Segment("0", null);
+    blockletDataMapDistributable.setSegment(segment);
+    BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
+        "/opt/store/default/carbon_table/Fact/Part0/Segment_0/1521012756710.carbonindexmerge", null);
+    blockletDataMapDistributable1.setSegment(segment);
+    List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
+    dataMapDistributables.add(blockletDataMapDistributable);
+    dataMapDistributables.add(blockletDataMapDistributable1);
+    new MockUp<BlockletDataMapFactory>() {
+      @Mock Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+          Segment segment) {
+        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
+            new TableBlockIndexUniqueIdentifier(
+                "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+                "0_batchno0-0-1521012756701.carbonindex", "1521012756710.carbonindexmerge", "0");
+        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier2 =
+            new TableBlockIndexUniqueIdentifier(
+                "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+                "0_batchno0-0-1521012756702.carbonindex", "1521012756710.carbonindexmerge", "0");
+        Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier2);
+        return tableBlockIndexUniqueIdentifiers;
+      }
+    };
+    List<DataMapDistributable> validDistributables =
+        blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables);
+    assert 1 == validDistributables.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 8be1e2e..9f67b22 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,21 +16,38 @@
  */
 package org.apache.carbondata.hadoop;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
  * CacheClient : Holds all the Cache access clients for Btree, Dictionary
  */
 public class CacheClient {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CacheClient.class.getName());
+
   // segment access client for driver LRU cache
   private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
       segmentAccessClient;
 
+  private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>
+      segmentProperties =
+      new HashMap<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>();
+
   public CacheClient() {
     Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
         CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
@@ -45,4 +62,30 @@ public class CacheClient {
   public void close() {
     segmentAccessClient.close();
   }
+
+  /**
+   * Method to get the segment properties and avoid construction of new segment properties until
+   * the schema is not modified
+   *
+   * @param tableIdentifier
+   * @param columnsInTable
+   * @param columnCardinality
+   */
+  public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier tableIdentifier,
+      List<ColumnSchema> columnsInTable, int[] columnCardinality) {
+    SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper =
+        new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, columnsInTable,
+            columnCardinality);
+    SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+    if (null == segmentProperties) {
+      // create a metadata details
+      // this will be useful in query handling
+      // all the data file metadata will have common segment properties we
+      // can use first one to get create the segment properties
+      LOGGER.info("Constructing new SegmentProperties");
+      segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
+      this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+    }
+    return segmentProperties;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
new file mode 100644
index 0000000..35d4b39
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * abstract class for data map job
+ */
+public abstract class AbstractDataMapJob implements DataMapJob {
+
+  @Override
+  public List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, DataMap> format) {
+    return null;
+  }
+
+  @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+      FilterResolverIntf resolverIntf) {
+    return null;
+  }
+}