You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/21 08:35:57 UTC

[carbondata] 02/02: [CARBONDATA-3447]Index server performance improvement

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 51b37029733763bbb1ba396671f9b40d613c38ba
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Wed Jun 12 20:52:00 2019 +0530

    [CARBONDATA-3447]Index server performance improvement
    
    Problem:
    When number of splits are high, index server performance is slow as
    compared to old flow(Driver caching). This is because data is transferred
    over network is more and causing performance bottleneck.
    
    Solution:
    1. If data transferred is less we can sent through network, but when it
    grows we can write to file and only send file name and in Main driver
    it will read the file and construct input split.
    2. Use snappy to compress the data, so data transferred through network/written
    to file size will be less, so IO time wont impact performance
    3. In main driver pruning is done in multiple thread, added same for
    index executor as now index executor will do the pruning
    4. In case of block cache no need to send blockletdetailinfo object as
    size is more and same can be constructed in executor from file footer
    
    This closes #3281
    
    Co-authored-by: kunal642 <ku...@gmail.com>
---
 .../core/constants/CarbonCommonConstants.java      |  45 +++-
 .../core/datamap/DataMapStoreManager.java          |  28 +++
 .../carbondata/core/datamap/DataMapUtil.java       |  19 +-
 .../core/datamap/DistributableDataMapFormat.java   |  80 +++---
 .../carbondata/core/datamap/TableDataMap.java      |  20 +-
 .../core/datastore/block/TableBlockInfo.java       |  14 ++
 .../carbondata/core/indexstore/Blocklet.java       |   6 +
 .../core/indexstore/BlockletDataMapIndexStore.java |  22 +-
 .../core/indexstore/BlockletDetailInfo.java        |  16 +-
 .../core/indexstore/ExtendedBlocklet.java          |  71 +++---
 .../core/indexstore/ExtendedBlockletWrapper.java   | 251 +++++++++++++++++++
 .../ExtendedBlockletWrapperContainer.java          | 160 ++++++++++++
 .../indexstore/blockletindex/BlockDataMap.java     |  13 +-
 .../blockletindex/BlockletDataMapModel.java        |  14 ++
 .../scan/executor/impl/AbstractQueryExecutor.java  |  15 +-
 .../core/scan/executor/util/QueryUtil.java         |  26 ++
 .../carbondata/core/statusmanager/FileFormat.java  |   4 -
 .../ExtendedByteArrayInputStream.java}             |  37 ++-
 .../stream}/ExtendedByteArrayOutputStream.java     |  19 +-
 .../ExtendedDataInputStream.java}                  |  38 ++-
 .../carbondata/core/util/BlockletDataMapUtil.java  |   4 +-
 .../carbondata/core/util/CarbonProperties.java     |  73 ++++++
 .../apache/carbondata/core/util/CarbonUtil.java    |  27 +++
 .../apache/carbondata/hadoop/CarbonInputSplit.java | 267 +++++++++++++++++++--
 .../carbondata/hadoop/CarbonMultiBlockSplit.java   |   4 +-
 .../carbondata/hadoop/CarbonRecordReader.java      |  14 +-
 .../hadoop/api/CarbonTableInputFormat.java         |   4 +-
 .../hadoop/util/CarbonVectorizedRecordReader.java  |  15 +-
 .../testsuite/datamap/FGDataMapTestCase.scala      |  14 +-
 .../org/apache/carbondata/spark/util/Util.java     |   3 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |   2 +-
 .../carbondata/indexserver/DataMapJobs.scala       |  67 +++---
 .../indexserver/DistributedPruneRDD.scala          | 150 ++++++++----
 .../indexserver/DistributedRDDUtils.scala          |  17 +-
 .../indexserver/DistributedShowCacheRDD.scala      |   2 +-
 .../carbondata/indexserver/IndexServer.scala       |  12 +-
 .../indexserver/InvalidateSegmentCacheRDD.scala    |   2 +-
 .../processing/merger/CarbonCompactionUtil.java    |  18 +-
 .../carbondata/sdk/file/arrow/ArrowConverter.java  |   1 +
 39 files changed, 1283 insertions(+), 311 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1201e1a..6833c8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1461,6 +1461,14 @@ public final class CarbonCommonConstants {
   // block prune in multi-thread if files size more than 100K files.
   public static final int CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 100000;
 
+  /**
+   * max executor threads used for block pruning [1 to 4 threads]
+   */
+  @CarbonProperty public static final String CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING =
+      "carbon.max.executor.threads.for.block.pruning";
+
+  public static final String CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT = "4";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Datamap parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////
@@ -2189,9 +2197,42 @@ public final class CarbonCommonConstants {
 
   public static final String LOAD_SYNC_TIME = "load_sync_time";
 
-  public  static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
+  public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
           "carbon.index.server.max.jobname.length";
 
-  public  static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
+  public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
           "50";
+
+  @CarbonProperty
+  /**
+   * Max in memory serialization size after reaching threshold data will
+   * be written to file
+   */
+  public static final String CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD =
+      "carbon.index.server.inmemory.serialization.threshold.inKB";
+
+  /**
+   * default value for in memory serialization size
+   */
+  public static final String CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT = "300";
+
+  /**
+   * min value for in memory serialization size
+   */
+  public static final int CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MIN = 100;
+
+  /**
+   * max value for in memory serialization size
+   */
+  public static final int CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MAX = 102400;
+
+  /**
+   * will be used to write split serialize data when in memory threashold crosses the limit
+   */
+  public static final String CARBON_INDEX_SERVER_TEMP_PATH = "carbon.indexserver.temp.path";
+
+  /**
+   * index server temp file name
+   */
+  public static final String INDEX_SERVER_TEMP_FOLDER_NAME = "indexservertmp";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 4d235c5..729c419 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
@@ -53,6 +54,7 @@ import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassPro
 import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -739,4 +741,30 @@ public final class DataMapStoreManager {
     }
   }
 
+  public synchronized void clearInvalidDataMaps(CarbonTable carbonTable, List<String> segmentNos,
+      String dataMapToClear) throws IOException {
+    List<TableDataMap> dataMaps = getAllDataMap(carbonTable);
+    List<TableDataMap> remainingDataMaps = new ArrayList<>();
+    if (StringUtils.isNotEmpty(dataMapToClear)) {
+      Iterator<TableDataMap> dataMapIterator = dataMaps.iterator();
+      while (dataMapIterator.hasNext()) {
+        TableDataMap tableDataMap = dataMapIterator.next();
+        if (dataMapToClear.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
+          for (String segment: segmentNos) {
+            tableDataMap.deleteSegmentDatamapData(segment);
+          }
+          tableDataMap.clear();
+        } else {
+          remainingDataMaps.add(tableDataMap);
+        }
+      }
+      getAllDataMaps().put(carbonTable.getTableUniqueName(), remainingDataMaps);
+    } else {
+      clearDataMaps(carbonTable.getTableUniqueName());
+      // clear the segment properties cache from executor
+      SegmentPropertiesAndSchemaHolder.getInstance()
+          .invalidate(carbonTable.getAbsoluteTableIdentifier());
+    }
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 2371a10..394a1dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -168,23 +168,28 @@ public class DataMapUtil {
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
-      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) throws IOException {
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
+      DataMapChooser dataMapChooser) throws IOException {
+    if (null == dataMapChooser) {
+      return blocklets;
+    }
     pruneSegments(segmentsToLoad, blocklets);
     List<ExtendedBlocklet> cgDataMaps = pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
         partitions, blocklets,
-        DataMapLevel.CG);
+        DataMapLevel.CG, dataMapChooser);
     pruneSegments(segmentsToLoad, cgDataMaps);
     return pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
         partitions, cgDataMaps,
-        DataMapLevel.FG);
+        DataMapLevel.FG, dataMapChooser);
   }
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
-      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, DataMapLevel dataMapLevel)
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, DataMapLevel dataMapLevel,
+      DataMapChooser dataMapChooser)
       throws IOException {
     DataMapExprWrapper dataMapExprWrapper =
-        new DataMapChooser(table).chooseDataMap(dataMapLevel, filterResolverIntf);
+        dataMapChooser.chooseDataMap(dataMapLevel, filterResolverIntf);
     if (dataMapExprWrapper != null) {
       List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
       // Prune segments from already pruned blocklets
@@ -211,10 +216,6 @@ public class DataMapUtil {
       }
       return dataMapExprWrapper.pruneBlocklets(extendedBlocklets);
     }
-    // For all blocklets initialize the detail info so that it can be serialized to the driver.
-    for (ExtendedBlocklet blocklet : blocklets) {
-      blocklet.getDetailInfo();
-    }
     return blocklets;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 0478b40..0f82f57 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -41,7 +40,6 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -85,7 +83,9 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private String taskGroupDesc = "";
 
+  private String queryId = "";
 
+  private boolean isWriteToFile = true;
   DistributableDataMapFormat() {
 
   }
@@ -141,42 +141,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
         distributable.getDistributable().getSegment().setReadCommittedScope(readCommittedScope);
         List<Segment> segmentsToLoad = new ArrayList<>();
         segmentsToLoad.add(distributable.getDistributable().getSegment());
-        if (isJobToClearDataMaps) {
-          if (StringUtils.isNotEmpty(dataMapToClear)) {
-            List<TableDataMap> dataMaps =
-                DataMapStoreManager.getInstance().getAllDataMap(table);
-            int i = 0;
-            for (TableDataMap tableDataMap : dataMaps) {
-              if (tableDataMap != null && dataMapToClear
-                  .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
-                tableDataMap.deleteSegmentDatamapData(
-                    ((DataMapDistributableWrapper) inputSplit).getDistributable().getSegment()
-                        .getSegmentNo());
-                tableDataMap.clear();
-                dataMaps.remove(i);
-                break;
-              }
-              i++;
-            }
-            DataMapStoreManager.getInstance().getAllDataMaps().put(table.getTableUniqueName(),
-                dataMaps);
-          } else {
-            // if job is to clear datamaps just clear datamaps from cache and return
-            DataMapStoreManager.getInstance()
-                .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName());
-            // clear the segment properties cache from executor
-            SegmentPropertiesAndSchemaHolder.getInstance()
-                .invalidate(table.getAbsoluteTableIdentifier());
-          }
-          List<ExtendedBlocklet> list = new ArrayList<ExtendedBlocklet>();
-          list.add(new ExtendedBlocklet());
-          blockletIterator = list.iterator();
-          return;
-        } else if (invalidSegments.size() > 0) {
-          // clear the segmentMap and from cache in executor when there are invalid segments
-          DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments);
-        }
         List<ExtendedBlocklet> blocklets = new ArrayList<>();
+        DataMapChooser dataMapChooser = null;
+        if (null != filterResolverIntf) {
+          dataMapChooser = new DataMapChooser(table);
+        }
         if (dataMapLevel == null) {
           TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
               .getDataMap(table, distributable.getDistributable().getDataMapSchema());
@@ -189,11 +158,12 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
                 partitions);
           }
           blocklets = DataMapUtil
-              .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets);
+              .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets,
+                  dataMapChooser);
         } else {
           blocklets = DataMapUtil
               .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets,
-                  dataMapLevel);
+                  dataMapLevel, dataMapChooser);
         }
         blockletIterator = blocklets.iterator();
       }
@@ -280,6 +250,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     out.writeUTF(dataMapToClear);
     out.writeUTF(taskGroupId);
     out.writeUTF(taskGroupDesc);
+    out.writeUTF(queryId);
+    out.writeBoolean(isWriteToFile);
   }
 
   @Override
@@ -323,6 +295,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     this.dataMapToClear = in.readUTF();
     this.taskGroupId = in.readUTF();
     this.taskGroupDesc = in.readUTF();
+    this.queryId = in.readUTF();
+    this.isWriteToFile = in.readBoolean();
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -390,4 +364,32 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
   public void setFilterResolverIntf(FilterResolverIntf filterResolverIntf) {
     this.filterResolverIntf = filterResolverIntf;
   }
+
+  public List<String> getInvalidSegments() {
+    return invalidSegments;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public void setQueryId(String queryId) {
+    this.queryId = queryId;
+  }
+
+  public String getDataMapToClear() {
+    return dataMapToClear;
+  }
+
+  public void setIsWriteToFile(boolean isWriteToFile) {
+    this.isWriteToFile = isWriteToFile;
+  }
+
+  public boolean isWriteToFile() {
+    return isWriteToFile;
+  }
+
+  public void setFallbackJob() {
+    isFallbackJob = true;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index bc87298..33fc3b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -124,7 +124,7 @@ public final class TableDataMap extends OperationEventListener {
         datamapsCount++;
       }
     }
-    int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    int numOfThreadsForPruning = CarbonProperties.getNumOfThreadsForPruning();
     if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning || totalFiles
         < CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) {
       // use multi-thread, only if the files are more than 0.1 million.
@@ -206,7 +206,7 @@ public final class TableDataMap extends OperationEventListener {
      *********************************************************************************
      */
 
-    int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    int numOfThreadsForPruning = CarbonProperties.getNumOfThreadsForPruning();
     LOG.info(
         "Number of threads selected for multi-thread block pruning is " + numOfThreadsForPruning
             + ". total files: " + totalFiles + ". total segments: " + segments.size());
@@ -323,22 +323,6 @@ public final class TableDataMap extends OperationEventListener {
     return blocklets;
   }
 
-  private int getNumOfThreadsForPruning() {
-    int numOfThreadsForPruning = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING,
-            CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT));
-    if (numOfThreadsForPruning > Integer
-        .parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT)
-        || numOfThreadsForPruning < 1) {
-      LOG.info("Invalid value for carbon.max.driver.threads.for.block.pruning, value :"
-          + numOfThreadsForPruning + " .using the default threads : "
-          + CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
-      numOfThreadsForPruning = Integer
-          .parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
-    }
-    return numOfThreadsForPruning;
-  }
-
   private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> pruneBlocklets,
       Segment segment) {
     for (ExtendedBlocklet blocklet : pruneBlocklets) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 8ef2198..25d82f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -105,6 +105,11 @@ public class TableBlockInfo implements Distributable, Serializable {
   private transient DataFileFooter dataFileFooter;
 
   /**
+   * true when index file does't have blocklet information
+   */
+  private boolean isLegacyStore;
+
+  /**
    * comparator to sort by block size in descending order.
    * Since each line is not exactly the same, the size of a InputSplit may differs,
    * so we allow some deviation for these splits.
@@ -210,6 +215,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     info.deletedDeltaFilePath = deletedDeltaFilePath;
     info.detailInfo = detailInfo.copy();
     info.dataMapWriterPath = dataMapWriterPath;
+    info.isLegacyStore = isLegacyStore;
     return info;
   }
 
@@ -473,6 +479,14 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.dataFileFooter = dataFileFooter;
   }
 
+  public boolean isLegacyStore() {
+    return isLegacyStore;
+  }
+
+  public void setLegacyStore(boolean legacyStore) {
+    isLegacyStore = legacyStore;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("TableBlockInfo{");
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 9aeb6c4..645fdd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -65,6 +65,10 @@ public class Blocklet implements Writable,Serializable {
     return filePath;
   }
 
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     if (filePath == null) {
@@ -79,6 +83,7 @@ public class Blocklet implements Writable,Serializable {
       out.writeBoolean(true);
       out.writeUTF(blockletId);
     }
+    out.writeBoolean(compareBlockletIdForObjectMatching);
   }
 
   @Override
@@ -89,6 +94,7 @@ public class Blocklet implements Writable,Serializable {
     if (in.readBoolean()) {
       blockletId = in.readUTF();
     }
+    this.compareBlockletIdForObjectMatching = in.readBoolean();
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index a9667a8..ce1e8ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactor
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
@@ -94,7 +95,7 @@ public class BlockletDataMapIndexStore
         Set<String> filesRead = new HashSet<>();
         String segmentFilePath = identifier.getIndexFilePath();
         if (segInfoCache == null) {
-          segInfoCache = new HashMap<String, Map<String, BlockMetaInfo>>();
+          segInfoCache = new HashMap<>();
         }
         Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
             segInfoCache.get(segmentFilePath);
@@ -106,14 +107,15 @@ public class BlockletDataMapIndexStore
         }
         // if the identifier is not a merge file we can directly load the datamaps
         if (identifier.getMergeIndexFileName() == null) {
+          List<DataFileFooter> indexInfos = new ArrayList<>();
           Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
               .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
-                  carbonDataFileBlockMetaInfoMapping);
+                  carbonDataFileBlockMetaInfoMapping, indexInfos);
           BlockDataMap blockletDataMap =
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
                   identifierWrapper.getCarbonTable(),
                   identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(),
-                  identifierWrapper.getConfiguration());
+                  identifierWrapper.getConfiguration(), indexInfos);
           dataMaps.add(blockletDataMap);
           blockletDataMapIndexWrapper =
               new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
@@ -123,16 +125,17 @@ public class BlockletDataMapIndexStore
               BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
           for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
               tableBlockIndexUniqueIdentifiers) {
+            List<DataFileFooter> indexInfos = new ArrayList<>();
             Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil.getBlockMetaInfoMap(
                 new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
                     identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
-                carbonDataFileBlockMetaInfoMapping);
+                carbonDataFileBlockMetaInfoMapping, indexInfos);
             if (!blockMetaInfoMap.isEmpty()) {
               BlockDataMap blockletDataMap =
                   loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
                       identifierWrapper.getCarbonTable(),
                       identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(),
-                      identifierWrapper.getConfiguration());
+                      identifierWrapper.getConfiguration(), indexInfos);
               dataMaps.add(blockletDataMap);
             }
           }
@@ -274,7 +277,8 @@ public class BlockletDataMapIndexStore
    */
   private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
       SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
-      CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration)
+      CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration,
+      List<DataFileFooter> indexInfos)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -285,10 +289,12 @@ public class BlockletDataMapIndexStore
     BlockDataMap dataMap;
     synchronized (lock) {
       dataMap = (BlockDataMap) BlockletDataMapFactory.createDataMap(carbonTable);
-      dataMap.init(new BlockletDataMapModel(carbonTable,
+      final BlockletDataMapModel blockletDataMapModel = new BlockletDataMapModel(carbonTable,
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
-          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration));
+          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration);
+      blockletDataMapModel.setIndexInfos(indexInfos);
+      dataMap.init(blockletDataMapModel);
     }
     return dataMap;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index af07f09..31dcd24 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -69,10 +69,7 @@ public class BlockletDetailInfo implements Serializable, Writable {
   private byte[] columnSchemaBinary;
 
   private long blockSize;
-  /**
-   * flag to check for store from 1.1 or any prior version
-   */
-  private boolean isLegacyStore;
+
   /**
    * flag to check whether to serialize min max values. The flag will be set to true in case
    * 1. When CACHE_LEVEL = BLOCKLET and filter column min/max in not cached in the driver using the
@@ -196,7 +193,6 @@ public class BlockletDetailInfo implements Serializable, Writable {
     out.writeInt(blockletInfoBinary.length);
     out.write(blockletInfoBinary);
     out.writeLong(blockSize);
-    out.writeBoolean(isLegacyStore);
     out.writeBoolean(useMinMaxForPruning);
   }
 
@@ -227,7 +223,6 @@ public class BlockletDetailInfo implements Serializable, Writable {
     in.readFully(blockletInfoBinary);
     setBlockletInfoFromBinary();
     blockSize = in.readLong();
-    isLegacyStore = in.readBoolean();
     useMinMaxForPruning = in.readBoolean();
   }
 
@@ -265,7 +260,6 @@ public class BlockletDetailInfo implements Serializable, Writable {
     detailInfo.columnSchemas = columnSchemas;
     detailInfo.columnSchemaBinary = columnSchemaBinary;
     detailInfo.blockSize = blockSize;
-    detailInfo.isLegacyStore = isLegacyStore;
     detailInfo.useMinMaxForPruning = useMinMaxForPruning;
     return detailInfo;
   }
@@ -301,14 +295,6 @@ public class BlockletDetailInfo implements Serializable, Writable {
     this.blockletInfoBinary = blockletInfoBinary;
   }
 
-  public boolean isLegacyStore() {
-    return isLegacyStore;
-  }
-
-  public void setLegacyStore(boolean legacyStore) {
-    isLegacyStore = legacyStore;
-  }
-
   public void setColumnSchemas(List<ColumnSchema> columnSchemas) {
     this.columnSchemas = columnSchemas;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 1de1ab5..a85423b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -18,13 +18,16 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 /**
@@ -150,9 +153,16 @@ public class ExtendedBlocklet extends Blocklet {
     this.inputSplit.setColumnSchema(columnSchema);
   }
 
-
-
-  @Override public void write(DataOutput out) throws IOException {
+  /**
+   * Method to seralize extended blocklet and inputsplit for index server
+   * DataFormat
+   * <Extended Blocklet data><Carbon input split serializeData lenght><CarbonInputSplitData>
+   * @param out
+   * @param uniqueLocation
+   * @throws IOException
+   */
+  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
+      throws IOException {
     super.write(out);
     if (dataMapUniqueId == null) {
       out.writeBoolean(false);
@@ -160,41 +170,44 @@ public class ExtendedBlocklet extends Blocklet {
       out.writeBoolean(true);
       out.writeUTF(dataMapUniqueId);
     }
+    out.writeBoolean(inputSplit != null);
     if (inputSplit != null) {
-      out.writeBoolean(true);
-      inputSplit.write(out);
-      String[] locations = getLocations();
-      if (locations != null) {
-        out.writeBoolean(true);
-        out.writeInt(locations.length);
-        for (String location : locations) {
-          out.writeUTF(location);
-        }
-      } else {
-        out.writeBoolean(false);
+      // creating byte array output stream to get the size of input split serializeData size
+      ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(ebos);
+      inputSplit.setFilePath(null);
+      inputSplit.setBucketId(null);
+      if (inputSplit.isBlockCache()) {
+        inputSplit.updateFooteroffset();
+        inputSplit.updateBlockLength();
+        inputSplit.setWriteDetailInfo(false);
       }
-    } else {
-      out.writeBoolean(false);
+      inputSplit.serializeFields(dos, uniqueLocation);
+      out.writeInt(ebos.size());
+      out.write(ebos.getBuffer(), 0 , ebos.size());
     }
   }
 
-  @Override public void readFields(DataInput in) throws IOException {
+  /**
+   * Method to deseralize extended blocklet and inputsplit for index server
+   * @param in
+   * @param locations
+   * @param tablePath
+   * @throws IOException
+   */
+  public void deserializeFields(DataInput in, String[] locations, String tablePath)
+      throws IOException {
     super.readFields(in);
     if (in.readBoolean()) {
       dataMapUniqueId = in.readUTF();
     }
-    if (in.readBoolean()) {
-      inputSplit = new CarbonInputSplit();
-      inputSplit.readFields(in);
-      if (in.readBoolean()) {
-        int numLocations = in.readInt();
-        String[] locations = new String[numLocations];
-        for (int i = 0; i < numLocations; i++) {
-          locations[i] = in.readUTF();
-        }
-        inputSplit.setLocation(locations);
-      }
+    setFilePath(tablePath + getPath());
+    boolean isSplitPresent = in.readBoolean();
+    if (isSplitPresent) {
+      // getting the length of the data
+      final int serializeLen = in.readInt();
+      this.inputSplit =
+          new CarbonInputSplit(serializeLen, in, getFilePath(), locations, getBlockletId());
     }
   }
-
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
new file mode 100644
index 0000000..9fd4e66
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.stream.ExtendedByteArrayInputStream;
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
+import org.apache.carbondata.core.stream.ExtendedDataInputStream;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.log4j.Logger;
+
+/**
+ * class will be used to send extended blocklet object from index executor to index driver
+ * if data size is more than it will be written in temp folder provided by user
+ * and only file name will send, if data size is less then complete data will be send to
+ * index executor
+ */
+public class ExtendedBlockletWrapper implements Writable, Serializable {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());
+
+  private boolean isWrittenToFile;
+
+  private int dataSize;
+
+  private byte[] bytes;
+
+  private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+  private static final int BLOCK_SIZE = 256 * 1024 * 1024;
+
+  public ExtendedBlockletWrapper() {
+
+  }
+
+  public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
+      String queryId, boolean isWriteToFile) {
+    Map<String, Short> uniqueLocations = new HashMap<>();
+    byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList);
+    int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+            CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
+    DataOutputStream stream = null;
+    // if data size is more then data will be written in file and file name will be sent from
+    // executor to driver, in case of any failure data will send through network
+    if (bytes.length > serializeAllowedSize && isWriteToFile) {
+      final String fileName = UUID.randomUUID().toString();
+      String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+      try {
+        final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath);
+        boolean isFolderExists = true;
+        if (!carbonFile.isFileExist(folderPath)) {
+          LOGGER.warn("Folder:" + folderPath + "doesn't exists, data will be send through netwrok");
+          isFolderExists = false;
+        }
+        if (isFolderExists) {
+          stream = FileFactory.getDataOutputStream(folderPath + "/" + fileName,
+              FileFactory.getFileType(folderPath),
+                  BUFFER_SIZE, BLOCK_SIZE, (short) 1);
+          writeBlockletToStream(stream, bytes, uniqueLocations, extendedBlockletList);
+          this.dataSize = stream.size();
+          this.bytes = fileName.getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+          isWrittenToFile = true;
+        }
+      } catch (IOException e) {
+        LOGGER.error("Problem while writing to file, data will be sent through network", e);
+      } finally {
+        CarbonUtil.closeStreams(stream);
+      }
+    }
+    if (!isWrittenToFile) {
+      try {
+        ExtendedByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
+        stream = new DataOutputStream(bos);
+        writeBlockletToStream(stream, bytes, uniqueLocations, extendedBlockletList);
+        this.dataSize = bos.size();
+        this.bytes = bos.getBuffer();
+      } catch (IOException e) {
+        LOGGER.error("Problem while writing data to memory stream", e);
+      } finally {
+        CarbonUtil.closeStreams(stream);
+      }
+    }
+  }
+
+  private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
+      List<ExtendedBlocklet> extendedBlockletList) {
+    ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
+    DataOutputStream stream = new DataOutputStream(bos);
+    try {
+      for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
+        extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
+        extendedBlocklet.serializeData(stream, uniqueLocations);
+      }
+      return new SnappyCompressor().compressByte(bos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      CarbonUtil.closeStreams(stream);
+    }
+  }
+
+  /**
+   * Below method will be used to write the data to stream[file/memory]
+   * Data Format
+   * <number of splits><number of unique location[short]><locations><serialize data len><data>
+   * @param stream
+   * @param data
+   * @param uniqueLocation
+   * @param extendedBlockletList
+   * @throws IOException
+   */
+  private void writeBlockletToStream(DataOutputStream stream, byte[] data,
+      Map<String, Short> uniqueLocation, List<ExtendedBlocklet> extendedBlockletList)
+      throws IOException {
+    stream.writeInt(extendedBlockletList.size());
+    String[] uniqueLoc = new String[uniqueLocation.size()];
+    Iterator<Map.Entry<String, Short>> iterator = uniqueLocation.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<String, Short> next = iterator.next();
+      uniqueLoc[next.getValue()] = next.getKey();
+    }
+    stream.writeShort((short)uniqueLoc.length);
+    for (String loc : uniqueLoc) {
+      stream.writeUTF(loc);
+    }
+    stream.writeInt(data.length);
+    stream.write(data);
+  }
+
+  /**
+   * deseralize the blocklet data from file or stream
+   * data format
+   * <number of splits><number of unique location[short]><locations><serialize data len><data>
+   * @param tablePath
+   * @param queryId
+   * @return
+   * @throws IOException
+   */
+  public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException {
+    byte[] data;
+    if (bytes != null) {
+      if (isWrittenToFile) {
+        DataInputStream stream = null;
+        try {
+          final String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+          String fileName = new String(bytes, CarbonCommonConstants.DEFAULT_CHARSET);
+          stream = FileFactory
+              .getDataInputStream(folderPath + "/" + fileName, FileFactory.getFileType(folderPath));
+          data = new byte[dataSize];
+          stream.readFully(data);
+        } finally {
+          CarbonUtil.closeStreams(stream);
+        }
+      } else {
+        data = bytes;
+      }
+      DataInputStream stream = null;
+      int numberOfBlocklet;
+      String[] locations;
+      int actualDataLen;
+      try {
+        stream = new DataInputStream(new ByteArrayInputStream(data));
+        numberOfBlocklet = stream.readInt();
+        short numberOfLocations = stream.readShort();
+        locations = new String[numberOfLocations];
+        for (int i = 0; i < numberOfLocations; i++) {
+          locations[i] = stream.readUTF();
+        }
+        actualDataLen = stream.readInt();
+      } finally {
+        CarbonUtil.closeStreams(stream);
+      }
+
+      final byte[] unCompressByte =
+          new SnappyCompressor().unCompressByte(data, data.length - actualDataLen, actualDataLen);
+      ExtendedByteArrayInputStream ebis = new ExtendedByteArrayInputStream(unCompressByte);
+      ExtendedDataInputStream eDIS = new ExtendedDataInputStream(ebis);
+      List<ExtendedBlocklet> extendedBlockletList = new ArrayList<>();
+      try {
+        for (int i = 0; i < numberOfBlocklet; i++) {
+          ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
+          extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
+          extendedBlockletList.add(extendedBlocklet);
+        }
+      } finally {
+        CarbonUtil.closeStreams(eDIS);
+      }
+      return extendedBlockletList;
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isWrittenToFile);
+    out.writeBoolean(bytes != null);
+    if (bytes != null) {
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+    out.writeInt(dataSize);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    this.isWrittenToFile = in.readBoolean();
+    if (in.readBoolean()) {
+      this.bytes = new byte[in.readInt()];
+      in.readFully(bytes);
+    }
+    this.dataSize = in.readInt();
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
new file mode 100644
index 0000000..0c52297
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * below class will be used to send split information from index driver to
+ * main driver.
+ * Main driver will Deserialize the extended blocklet object and get the split
+ * to run the query
+ */
+public class ExtendedBlockletWrapperContainer implements Writable {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(ExtendedBlockletWrapperContainer.class.getName());
+
+  private ExtendedBlockletWrapper[] extendedBlockletWrappers;
+
+  private boolean isFallbackJob;
+
+  public ExtendedBlockletWrapperContainer() {
+
+  }
+
+  public ExtendedBlockletWrapperContainer(ExtendedBlockletWrapper[] extendedBlockletWrappers,
+      boolean isFallbackJob) {
+    this.extendedBlockletWrappers = extendedBlockletWrappers;
+    this.isFallbackJob = isFallbackJob;
+  }
+
+  public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId)
+      throws IOException {
+    if (!isFallbackJob) {
+      int numOfThreads = CarbonProperties.getNumOfThreadsForPruning();
+      ExecutorService executorService = Executors
+          .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("SplitDeseralizerPool", true));
+      int numberOfWrapperPerThread = extendedBlockletWrappers.length / numOfThreads;
+      int leftOver = extendedBlockletWrappers.length % numOfThreads;
+      int[] split = null;
+      if (numberOfWrapperPerThread > 0) {
+        split = new int[numOfThreads];
+      } else {
+        split = new int[leftOver];
+      }
+      Arrays.fill(split, numberOfWrapperPerThread);
+      for (int i = 0; i < leftOver; i++) {
+        split[i] += 1;
+      }
+      int start = 0;
+      int end = 0;
+      List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>();
+      for (int i = 0; i < split.length; i++) {
+        end += split[i];
+        futures.add(executorService
+            .submit(new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId)));
+        start += split[i];
+      }
+      executorService.shutdown();
+      try {
+        executorService.awaitTermination(1, TimeUnit.HOURS);
+      } catch (InterruptedException e) {
+        LOGGER.error(e);
+        throw new RuntimeException(e);
+      }
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      for (int i = 0; i < futures.size(); i++) {
+        try {
+          extendedBlocklets.addAll(futures.get(i).get());
+        } catch (InterruptedException | ExecutionException e) {
+          LOGGER.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+      return extendedBlocklets;
+    } else {
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) {
+        extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId));
+      }
+      return extendedBlocklets;
+    }
+  }
+
+  private class ExtendedBlockletDeserializerThread implements Callable<List<ExtendedBlocklet>> {
+
+    private int start;
+
+    private int end;
+
+    private String tablePath;
+
+    private String queryId;
+
+    public ExtendedBlockletDeserializerThread(int start, int end, String tablePath,
+        String queryId) {
+      this.start = start;
+      this.end = end;
+      this.tablePath = tablePath;
+      this.queryId = queryId;
+    }
+
+    @Override public List<ExtendedBlocklet> call() throws Exception {
+      List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
+      for (int i = start; i < end; i++) {
+        extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId));
+      }
+      return extendedBlocklets;
+    }
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeInt(extendedBlockletWrappers.length);
+    for (int i = 0; i < extendedBlockletWrappers.length; i++) {
+      extendedBlockletWrappers[i].write(out);
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    extendedBlockletWrappers = new ExtendedBlockletWrapper[in.readInt()];
+    for (int i = 0; i < extendedBlockletWrappers.length; i++) {
+      ExtendedBlockletWrapper extendedBlockletWrapper = new ExtendedBlockletWrapper();
+      extendedBlockletWrapper.readFields(in);
+      extendedBlockletWrappers[i] = extendedBlockletWrapper;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 13e612d..24ad43a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -117,9 +117,16 @@ public class BlockDataMap extends CoarseGrainDataMap
     BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
     DataFileFooterConverter fileFooterConverter =
         new DataFileFooterConverter(dataMapModel.getConfiguration());
-    List<DataFileFooter> indexInfo = fileFooterConverter
-        .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
-            blockletDataMapInfo.getCarbonTable().isTransactionalTable());
+    List<DataFileFooter> indexInfo = null;
+    if (blockletDataMapInfo.getIndexInfos() == null || blockletDataMapInfo.getIndexInfos()
+        .isEmpty()) {
+      indexInfo = fileFooterConverter
+          .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
+              blockletDataMapInfo.getCarbonTable().isTransactionalTable());
+    } else {
+      // when index info is already read and converted to data file footer object
+      indexInfo = blockletDataMapInfo.getIndexInfos();
+    }
     Path path = new Path(blockletDataMapInfo.getFilePath());
     // store file path only in case of partition table, non transactional table and flat folder
     // structure
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 0a75d59..2f0c122 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -16,10 +16,12 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +40,10 @@ public class BlockletDataMapModel extends DataMapModel {
   private String segmentId;
 
   private boolean addToUnsafe = true;
+  /**
+   * list of index thrift object present in index file
+   */
+  private List<DataFileFooter> indexInfos;
 
   public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] fileData,
       Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, Configuration configuration) {
@@ -74,4 +80,12 @@ public class BlockletDataMapModel extends DataMapModel {
   public CarbonTable getCarbonTable() {
     return carbonTable;
   }
+
+  public void setIndexInfos(List<DataFileFooter> indexInfos) {
+    this.indexInfos = indexInfos;
+  }
+
+  public List<DataFileFooter> getIndexInfos() {
+    return indexInfos;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6c048f3..b3d4780 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -194,9 +194,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       // available so read the blocklet information from block file
       // 2. CACHE_LEVEL is set to block
       // 3. CACHE_LEVEL is BLOCKLET but filter column min/max is not cached in driver
-      if (blockletDetailInfo.getBlockletInfo() == null || blockletDetailInfo
-            .isUseMinMaxForPruning()) {
-        blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
+      if (null == blockletDetailInfo || blockletDetailInfo.getBlockletInfo() == null
+          || blockletDetailInfo.isUseMinMaxForPruning()) {
+        if (null != blockletDetailInfo) {
+          blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
+        }
         DataFileFooter fileFooter = filePathToFileFooterMapping.get(blockInfo.getFilePath());
         if (null != blockInfo.getDataFileFooter()) {
           fileFooter = blockInfo.getDataFileFooter();
@@ -211,6 +213,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             QueryUtil.updateColumnUniqueIdForNonTransactionTable(fileFooter.getColumnInTable());
           }
           filePathToFileFooterMapping.put(blockInfo.getFilePath(), fileFooter);
+          if (null == blockletDetailInfo) {
+            blockletDetailInfo = QueryUtil.getBlockletDetailInfo(fileFooter, blockInfo);
+          }
           blockInfo.setDetailInfo(blockletDetailInfo);
         }
         if (null == segmentProperties) {
@@ -220,7 +225,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
           updateColumns(queryModel, fileFooter.getColumnInTable(), blockInfo.getFilePath());
           filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
         }
-        if (blockletDetailInfo.isLegacyStore()) {
+        if (blockInfo.isLegacyStore()) {
           LOGGER.warn("Skipping Direct Vector Filling as it is not Supported "
               + "for Legacy store prior to V3 store");
           queryModel.setDirectVectorFill(false);
@@ -377,7 +382,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     detailInfo.setRowCount(blockletInfo.getNumberOfRows());
     byte[][] maxValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues();
     byte[][] minValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues();
-    if (blockletDetailInfo.isLegacyStore()) {
+    if (blockInfo.isLegacyStore()) {
       info.setDataBlockFromOldStore(true);
     }
     blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 95fbe66..ced99b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -34,12 +34,15 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -777,4 +780,27 @@ public class QueryUtil {
       }
     }
   }
+
+  /**
+   * In case of index server there will not be any details info serialize from driver.
+   * Below method will use to create blocklet detail info object from footer
+   * @param fileFooter
+   * @param blockInfo
+   * @return
+   */
+  public static BlockletDetailInfo getBlockletDetailInfo(DataFileFooter fileFooter,
+      TableBlockInfo blockInfo) {
+    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+    detailInfo.setDimLens(fileFooter.getSegmentInfo().getColumnCardinality());
+    detailInfo.setBlockletInfoBinary(new byte[0]);
+    detailInfo.setColumnSchemas(fileFooter.getColumnInTable());
+    detailInfo.setBlockletId((short) -1);
+    detailInfo.setRowCount((int) fileFooter.getNumberOfRows());
+    detailInfo.setSchemaUpdatedTimeStamp(fileFooter.getSchemaUpdatedTimeStamp());
+    detailInfo.setBlockFooterOffset(blockInfo.getBlockOffset());
+    detailInfo.setBlockSize(blockInfo.getBlockLength());
+    detailInfo.setUseMinMaxForPruning(true);
+    detailInfo.setVersionNumber(blockInfo.getVersion().number());
+    return detailInfo;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index c154c5f..05f616f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -29,10 +29,6 @@ public enum FileFormat {
   ROW_V1;
 
   public static FileFormat getByOrdinal(int ordinal) {
-    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return COLUMNAR_V3;
-    }
-
     switch (ordinal) {
       case 0:
         return COLUMNAR_V3;
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayInputStream.java
similarity index 59%
copy from core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
copy to core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayInputStream.java
index c154c5f..f03464a 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayInputStream.java
@@ -15,31 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.core.statusmanager;
+package org.apache.carbondata.core.stream;
 
-/**
- * The data file format supported in carbondata project
- */
-public enum FileFormat {
+import java.io.ByteArrayInputStream;
 
-  // carbondata columnar file format, optimized for read
-  COLUMNAR_V3,
+public class ExtendedByteArrayInputStream extends ByteArrayInputStream {
+  public ExtendedByteArrayInputStream(byte[] buf) {
+    super(buf);
+  }
 
-  // carbondata row file format, optimized for write
-  ROW_V1;
+  public byte[] getBuffer() {
+    return buf;
+  }
 
-  public static FileFormat getByOrdinal(int ordinal) {
-    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return COLUMNAR_V3;
-    }
+  public void setPosition(int position) {
+    this.pos = position;
+  }
 
-    switch (ordinal) {
-      case 0:
-        return COLUMNAR_V3;
-      case 1:
-        return ROW_V1;
-    }
+  public int getPosition() {
+    return this.pos;
+  }
 
-    return COLUMNAR_V3;
+  public int getLength() {
+    return count - pos;
   }
 }
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayOutputStream.java
similarity index 74%
rename from store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
rename to core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayOutputStream.java
index 393cd86..f941bd1 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
+++ b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedByteArrayOutputStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.carbondata.sdk.file.arrow;
+package org.apache.carbondata.core.stream;
 
 import java.io.ByteArrayOutputStream;
 
@@ -23,17 +23,24 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 
 public class ExtendedByteArrayOutputStream extends ByteArrayOutputStream {
 
+  public ExtendedByteArrayOutputStream() {
+
+  }
+
   public ExtendedByteArrayOutputStream(int initialSize) {
     super(initialSize);
   }
 
+  public byte[] getBuffer() {
+    return buf;
+  }
+
   public long copyToAddress() {
-    final long address = CarbonUnsafe.getUnsafe()
-        .allocateMemory(CarbonCommonConstants.INT_SIZE_IN_BYTE + count);
+    final long address =
+        CarbonUnsafe.getUnsafe().allocateMemory(CarbonCommonConstants.INT_SIZE_IN_BYTE + count);
     CarbonUnsafe.getUnsafe().putInt(address, count);
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(buf, CarbonUnsafe.BYTE_ARRAY_OFFSET, null,
-            address + CarbonCommonConstants.INT_SIZE_IN_BYTE, count);
+    CarbonUnsafe.getUnsafe().copyMemory(buf, CarbonUnsafe.BYTE_ARRAY_OFFSET, null,
+        address + CarbonCommonConstants.INT_SIZE_IN_BYTE, count);
     return address;
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedDataInputStream.java
similarity index 59%
copy from core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
copy to core/src/main/java/org/apache/carbondata/core/stream/ExtendedDataInputStream.java
index c154c5f..c0e236b 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/stream/ExtendedDataInputStream.java
@@ -15,31 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.core.statusmanager;
+package org.apache.carbondata.core.stream;
 
-/**
- * The data file format supported in carbondata project
- */
-public enum FileFormat {
-
-  // carbondata columnar file format, optimized for read
-  COLUMNAR_V3,
+import java.io.DataInputStream;
 
-  // carbondata row file format, optimized for write
-  ROW_V1;
+public class ExtendedDataInputStream extends DataInputStream {
 
-  public static FileFormat getByOrdinal(int ordinal) {
-    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
-      return COLUMNAR_V3;
-    }
+  private ExtendedByteArrayInputStream in;
 
-    switch (ordinal) {
-      case 0:
-        return COLUMNAR_V3;
-      case 1:
-        return ROW_V1;
-    }
+  /**
+   * Creates a DataInputStream that uses the specified
+   * underlying InputStream.
+   *
+   * @param in the specified input stream
+   */
+  public ExtendedDataInputStream(ExtendedByteArrayInputStream in) {
+    super(in);
+    this.in = in;
+  }
 
-    return COLUMNAR_V3;
+  public ExtendedByteArrayInputStream getUnderlineStream() {
+    return this.in;
   }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 9074587..6cd60a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -74,7 +74,8 @@ public class BlockletDataMapUtil {
   public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
       TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
       SegmentIndexFileStore indexFileStore, Set<String> filesRead,
-      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping) throws IOException {
+      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, List<DataFileFooter> indexInfos)
+      throws IOException {
     boolean isTransactionalTable = true;
     TableBlockIndexUniqueIdentifier identifier =
         identifierWrapper.getTableBlockIndexUniqueIdentifier();
@@ -107,6 +108,7 @@ public class BlockletDataMapUtil {
         identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
             .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
         isTransactionalTable);
+    indexInfos.addAll(indexInfo);
     for (DataFileFooter footer : indexInfo) {
       if ((!isTransactionalTable) && (tableColumnList.size() != 0) &&
           !isSameColumnAndDifferentDatatypeInSchema(footer.getColumnInTable(), tableColumnList)) {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a53c365..ec8a1c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -199,6 +199,9 @@ public final class CarbonProperties {
       case DETAIL_QUERY_BATCH_SIZE:
         validateDetailQueryBatchSize();
         break;
+      case CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD:
+        validateIndexServerSerializationThreshold();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for addProperty flow
       default:
         // none
@@ -264,6 +267,7 @@ public final class CarbonProperties {
     validateSortMemorySpillPercentage();
     validateStringCharacterLimit();
     validateDetailQueryBatchSize();
+    validateIndexServerSerializationThreshold();
   }
 
   /**
@@ -656,6 +660,38 @@ public final class CarbonProperties {
   }
 
   /**
+   * This method validates the index server serialization size
+   */
+  private void validateIndexServerSerializationThreshold() {
+    String serializationSizeString = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+            CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+    try {
+      int serializationSize = Integer.parseInt(serializationSizeString);
+
+      if (serializationSize < CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MIN
+          || serializationSize
+          > CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_MAX) {
+        LOGGER.info(
+            "The " + CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD + " value \""
+                + serializationSize + "\" is invalid. Using the default value \""
+                + CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+        carbonProperties
+            .setProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+                CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info(
+          "The " + CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD + " value \""
+              + serializationSizeString + "\" is invalid. Using the default value \""
+              + CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+      carbonProperties
+          .setProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
+              CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT);
+    }
+  }
+
+  /**
    * This method validates the sort size
    */
   private void validateSortSize() {
@@ -1670,4 +1706,41 @@ public final class CarbonProperties {
     return CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT;
   }
 
+  public int getNumOfThreadsForExecutorPruning() {
+    String configuredValue = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING);
+    if (configuredValue == null || configuredValue.equalsIgnoreCase("0")) {
+      configuredValue = CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT;
+    }
+    try {
+      int numOfThreads = Integer.parseInt(configuredValue);
+      LOGGER.info("Value for "
+          + CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING + " is "
+          + numOfThreads);
+      return numOfThreads;
+    } catch (NumberFormatException e) {
+      LOGGER.info(configuredValue + " is not a valid input for "
+          + CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING + ", taking "
+          + CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT
+          + " as default value");
+      return Integer
+          .parseInt(CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+    }
+  }
+
+  public static int getNumOfThreadsForPruning() {
+    int numOfThreadsForPruning = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING,
+            CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT));
+    if (numOfThreadsForPruning > Integer
+        .parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT)
+        || numOfThreadsForPruning < 1) {
+      LOGGER.info("Invalid value for carbon.max.driver.threads.for.block.pruning, value :"
+          + numOfThreadsForPruning + " .using the default threads : "
+          + CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+      numOfThreadsForPruning = Integer
+          .parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+    }
+    return numOfThreadsForPruning;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 6fa24b7..376c757 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3311,4 +3311,31 @@ public final class CarbonUtil {
     }
     return null;
   }
+
+  public static String getIndexServerTempPath(String tablePath, String queryId) {
+    String tempFolderPath = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_TEMP_PATH);
+    if (null == tempFolderPath) {
+      tempFolderPath =
+          tablePath + "/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/" + queryId;
+    } else {
+      tempFolderPath =
+          tempFolderPath + "/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/"
+              + queryId;
+    }
+    return tempFolderPath;
+  }
+
+  public static CarbonFile createTempFolderForIndexServer(String tablePath, String queryId)
+      throws IOException {
+    final String path = getIndexServerTempPath(tablePath, queryId);
+    CarbonFile file = FileFactory.getCarbonFile(path);
+    if (!file.mkdirs(path)) {
+      LOGGER.info("Unable to create table directory for index server");
+      return null;
+    } else {
+      LOGGER.info("Created index server temp directory" + path);
+      return file;
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 931b41b..71c86c1 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -16,7 +16,9 @@
  */
 package org.apache.carbondata.hadoop;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
@@ -39,9 +41,12 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.stream.ExtendedByteArrayInputStream;
+import org.apache.carbondata.core.stream.ExtendedDataInputStream;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
@@ -92,7 +97,7 @@ public class CarbonInputSplit extends FileSplit
 
   private transient int[] columnCardinality;
 
-  private transient boolean isLegacyStore;
+  private boolean isLegacyStore;
 
   private transient List<ColumnSchema> columnSchema;
 
@@ -114,6 +119,37 @@ public class CarbonInputSplit extends FileSplit
 
   private transient String blockPath;
 
+  /**
+   * used in case of index server, all the fields which is required
+   * only in case in executor not need to deseralize and will be kept as
+   * byte array and duing write method directly it will be written to output stream
+   */
+  private byte[] serializeData;
+
+  /**
+   * start position of fields
+   */
+  private int offset;
+
+  /**
+   * actual length of data
+   */
+  private int actualLen;
+
+  /**
+   * in case of index server block cache no need to write detail info, filepath, blocklet id
+   * bucket id to reduce the serialize data size, this parameter will be used to check whether
+   * its a index server flow or not
+   */
+  private boolean writeDetailInfo = true;
+
+  /**
+   * TODO remove this code after Index server count(*) optimization
+   * only used for index server, once index server handled count star push down
+   * below row count is not required
+   */
+  private int rowCount;
+
   public CarbonInputSplit() {
     segment = null;
     taskId = "0";
@@ -123,6 +159,58 @@ public class CarbonInputSplit extends FileSplit
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
+  /**
+   * Will be used in case of index server
+   * @param serializeLen
+   * @param in
+   * @param filePath
+   * @param allLocation
+   * @param blockletId
+   * @throws IOException
+   */
+  public CarbonInputSplit(int serializeLen, DataInput in, String filePath, String[] allLocation,
+      String blockletId) throws IOException {
+    this.filePath = filePath;
+    this.blockletId = blockletId;
+    // getting the underline stream to get the actual position of the fileds which won't be
+    // deseralize as its used by executor
+    ExtendedByteArrayInputStream underlineStream =
+        ((ExtendedDataInputStream) in).getUnderlineStream();
+    // current position
+    int currentPosition = underlineStream.getPosition();
+    // number of locations
+    short numberOfLocations = in.readShort();
+    if (numberOfLocations > 0) {
+      // used locations for this split
+      this.location = new String[numberOfLocations];
+      for (int i = 0; i < location.length; i++) {
+        location[i] = allLocation[in.readShort()];
+      }
+    }
+    // get start
+    this.start = in.readLong();
+    this.length = in.readLong();
+    this.version = ColumnarFormatVersion.valueOf(in.readShort());
+    // will be removed after count(*) optmization in case of index server
+    this.rowCount = in.readInt();
+    // after deseralizing required field get the start position of field which will be only used
+    // in executor
+    int leftoverPosition = underlineStream.getPosition();
+    // position of next split
+    int newPosition = currentPosition + serializeLen;
+    // setting the position to next split
+    underlineStream.setPosition(newPosition);
+    this.serializeData = underlineStream.getBuffer();
+    this.offset = leftoverPosition;
+    this.actualLen = serializeLen - (leftoverPosition - currentPosition);
+    String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(this.filePath);
+    if (taskNo.contains("_")) {
+      taskNo = taskNo.split("_")[0];
+    }
+    this.taskId = taskNo;
+    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(this.filePath);
+  }
+
   private CarbonInputSplit(String segmentId, String blockletId, String filePath, long start,
       long length, ColumnarFormatVersion version, String[] deleteDeltaFiles,
       String dataMapWritePath) {
@@ -211,6 +299,7 @@ public class CarbonInputSplit extends FileSplit
                 blockletInfos, split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
         blockInfo.setDataMapWriterPath(split.dataMapWritePath);
+        blockInfo.setLegacyStore(split.isLegacyStore);
         if (split.getDetailInfo() != null) {
           blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
         }
@@ -232,7 +321,10 @@ public class CarbonInputSplit extends FileSplit
               inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
               inputSplit.getDeleteDeltaFiles());
       blockInfo.setDetailInfo(inputSplit.getDetailInfo());
-      blockInfo.setBlockOffset(inputSplit.getDetailInfo().getBlockFooterOffset());
+      if (null != inputSplit.getDetailInfo()) {
+        blockInfo.setBlockOffset(inputSplit.getDetailInfo().getBlockFooterOffset());
+      }
+      blockInfo.setLegacyStore(inputSplit.isLegacyStore);
       return blockInfo;
     } catch (IOException e) {
       throw new RuntimeException("fail to get location of split: " + inputSplit, e);
@@ -240,6 +332,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public String getSegmentId() {
+    derserializeField();
     if (segment != null) {
       return segment.getSegmentNo();
     } else {
@@ -248,18 +341,25 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public Segment getSegment() {
+    derserializeField();
     return segment;
   }
 
 
   @Override public void readFields(DataInput in) throws IOException {
-    this.filePath = in.readUTF();
-    this.start = in.readLong();
-    this.length = in.readLong();
-    this.segment = Segment.toSegment(in.readUTF());
-    this.version = ColumnarFormatVersion.valueOf(in.readShort());
-    this.bucketId = in.readUTF();
+    // if serializeData is not null it means fields which is present below if condition are alredy
+    // deserialize  org.apache.carbondata.hadoop.CarbonInputSplit#CarbonInputSplit(
+    // int, java.io.DataInput, java.lang.String, java.lang.String[], java.lang.String)
+    if (null == serializeData) {
+      this.filePath = in.readUTF();
+      this.start = in.readLong();
+      this.length = in.readLong();
+      this.version = ColumnarFormatVersion.valueOf(in.readShort());
+      this.rowCount = in.readInt();
+      this.bucketId = in.readUTF();
+    }
     this.blockletId = in.readUTF();
+    this.segment = Segment.toSegment(in.readUTF());
     int numberOfDeleteDeltaFiles = in.readInt();
     deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
     for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
@@ -279,26 +379,55 @@ public class CarbonInputSplit extends FileSplit
     for (int i = 0; i < validBlockletIdCount; i++) {
       validBlockletIds.add((int) in.readShort());
     }
+    this.isLegacyStore = in.readBoolean();
   }
 
   @Override public void write(DataOutput out) throws IOException {
-    out.writeUTF(filePath);
+    // if serializeData is not null then its a index server flow so write fields
+    // which is already deserialize and write serializeData to output stream
+    if (null != serializeData) {
+      out.writeUTF(filePath);
+      out.writeLong(start);
+      out.writeLong(length);
+      out.writeShort(version.number());
+      out.writeInt(rowCount);
+      out.writeUTF(bucketId);
+      out.writeUTF(blockletId);
+      out.write(serializeData, offset, actualLen);
+      return;
+    }
+    // please refer writeDetailInfo doc
+    if (null != filePath) {
+      out.writeUTF(filePath);
+    }
     out.writeLong(start);
     out.writeLong(length);
-    out.writeUTF(segment.toString());
     out.writeShort(version.number());
-    out.writeUTF(bucketId);
+    //TODO remove this code once count(*) optmization is added in case of index server
+    if (null != dataMapRow) {
+      out.writeInt(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
+    } else if (null != detailInfo) {
+      out.writeInt(this.detailInfo.getRowCount());
+    } else {
+      out.writeInt(0);
+    }
+    if (null != bucketId) {
+      out.writeUTF(bucketId);
+    }
     out.writeUTF(blockletId);
+    out.writeUTF(segment.toString());
     out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
     if (null != deleteDeltaFiles) {
       for (int i = 0; i < deleteDeltaFiles.length; i++) {
         out.writeUTF(deleteDeltaFiles[i]);
       }
     }
-    out.writeBoolean(detailInfo != null || dataMapRow != null);
-    if (detailInfo != null) {
+    // please refer writeDetailInfo doc
+    out.writeBoolean(writeDetailInfo && (detailInfo != null || dataMapRow != null));
+    if (writeDetailInfo && detailInfo != null) {
       detailInfo.write(out);
-    } else if (dataMapRow != null) {
+      // please refer writeDetailInfo doc
+    } else if (writeDetailInfo && dataMapRow != null) {
       writeBlockletDetailsInfo(out);
     }
     out.writeBoolean(dataMapWritePath != null);
@@ -309,6 +438,7 @@ public class CarbonInputSplit extends FileSplit
     for (Integer blockletId : getValidBlockletIds()) {
       out.writeShort(blockletId);
     }
+    out.writeBoolean(isLegacyStore);
   }
 
   /**
@@ -339,6 +469,8 @@ public class CarbonInputSplit extends FileSplit
       return -1;
     }
     CarbonInputSplit other = (CarbonInputSplit) o;
+    derserializeField();
+    other.derserializeField();
     int compareResult = 0;
     // get the segment id
     // converr seg ID to double.
@@ -400,6 +532,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   @Override public int hashCode() {
+    derserializeField();
     int result = taskId.hashCode();
     result = 31 * result + segment.hashCode();
     result = 31 * result + bucketId.hashCode();
@@ -498,6 +631,10 @@ public class CarbonInputSplit extends FileSplit
     this.isBlockCache = isBlockCache;
   }
 
+  public boolean isBlockCache() {
+    return this.isBlockCache;
+  }
+
   private void writeBlockletDetailsInfo(DataOutput out) throws IOException {
     out.writeInt(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
     if (this.isBlockCache) {
@@ -535,7 +672,6 @@ public class CarbonInputSplit extends FileSplit
       out.write(blockletInfoBinary);
     }
     out.writeLong(getLength());
-    out.writeBoolean(this.isLegacyStore);
     out.writeBoolean(this.useMinMaxForPruning);
   }
 
@@ -544,6 +680,7 @@ public class CarbonInputSplit extends FileSplit
       detailInfo = new BlockletDetailInfo();
       detailInfo
           .setRowCount(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
+      rowCount = detailInfo.getRowCount();
       detailInfo
           .setVersionNumber(this.dataMapRow.getShort(BlockletDataMapRowIndexes.VERSION_INDEX));
       detailInfo.setBlockletId(Short.parseShort(this.blockletId));
@@ -552,9 +689,10 @@ public class CarbonInputSplit extends FileSplit
           this.dataMapRow.getLong(BlockletDataMapRowIndexes.SCHEMA_UPADATED_TIME_INDEX));
       detailInfo.setBlockFooterOffset(
           this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET));
+      start = detailInfo.getBlockFooterOffset();
       detailInfo
           .setBlockSize(getLength());
-      detailInfo.setLegacyStore(isLegacyStore);
+      length = detailInfo.getBlockSize();
       detailInfo.setUseMinMaxForPruning(useMinMaxForPruning);
       if (!this.isBlockCache) {
         detailInfo.setColumnSchemas(this.columnSchema);
@@ -601,8 +739,22 @@ public class CarbonInputSplit extends FileSplit
   /** The position of the first byte in the file to process. */
   public long getStart() { return start; }
 
-  @Override
-  public long getLength() {
+  /**
+   * In case of index server detail info won't be present
+   * so footer offsets needs to be written correctly, so updating the length
+   *
+   */
+  public void updateFooteroffset() {
+    if (isBlockCache && start == 0) {
+      if (null != dataMapRow) {
+        start = this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET);
+      } else if (null != detailInfo) {
+        start = detailInfo.getBlockFooterOffset();
+      }
+    }
+  }
+
+  public void updateBlockLength() {
     if (length == -1) {
       if (null != dataMapRow) {
         length = this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH);
@@ -610,6 +762,11 @@ public class CarbonInputSplit extends FileSplit
         length = detailInfo.getBlockSize();
       }
     }
+  }
+
+  @Override
+  public long getLength() {
+    updateBlockLength();
     return length;
   }
 
@@ -629,4 +786,78 @@ public class CarbonInputSplit extends FileSplit
   public void setLocation(String[] location) {
     this.location = location;
   }
+
+  /**
+   * In case of index server block cache no need to write details info as its
+   * heavy object.
+   * @param writeDetailInfo
+   */
+  public void setWriteDetailInfo(boolean writeDetailInfo) {
+    this.writeDetailInfo = writeDetailInfo;
+  }
+
+  /**
+   * Below method will be used to serialize the input split in case of
+   * index server
+   * @param out
+   * @param uniqueLocationMap
+   * @throws IOException
+   */
+  public void serializeFields(DataOutput out, Map<String, Short> uniqueLocationMap)
+      throws IOException {
+    final String[] locations = getLocations();
+    if (null != locations) {
+      out.writeShort(locations.length);
+      // below code is to get the unique locations across all the block
+      for (String loc : locations) {
+        Short pos = uniqueLocationMap.get(loc);
+        if (null == pos) {
+          pos = (short) uniqueLocationMap.size();
+          uniqueLocationMap.put(loc, pos);
+        }
+        out.writeShort(pos);
+      }
+    } else {
+      out.writeShort(0);
+    }
+    write(out);
+  }
+
+  /**
+   * This method will be used to deserialize fields
+   * in case of index server
+   */
+  private void derserializeField() {
+    if (null != serializeData) {
+      DataInputStream in = null;
+      try {
+        ByteArrayInputStream bis = new ByteArrayInputStream(serializeData, offset, actualLen);
+        in = new DataInputStream(bis);
+        readFields(in);
+        serializeData = null;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (null != in) {
+          CarbonUtil.closeStreams(in);
+        }
+      }
+    }
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public void setStart(long start) {
+    this.start = start;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public void setBucketId(String bucketId) {
+    this.bucketId = bucketId;
+  }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 7ac5bc0..3703727 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -100,7 +100,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
 
   public void calculateLength() {
     long total = 0;
-    if (splitList.size() > 0) {
+    if (splitList.size() > 1) {
       Map<String, Long> blockSizes = new HashMap<>();
       for (CarbonInputSplit split : splitList) {
         blockSizes.put(split.getFilePath(), split.getLength());
@@ -108,6 +108,8 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
       for (Map.Entry<String, Long> entry : blockSizes.entrySet()) {
         total += entry.getValue();
       }
+    } else if (splitList.size() == 1) {
+      total += splitList.get(0).getLength();
     }
     length = total;
   }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 1a529e3..635ab1a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -81,17 +81,25 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       splitList = new ArrayList<>(1);
-      String splitPath = ((CarbonInputSplit) inputSplit).getFilePath();
+      CarbonInputSplit carbonInputSplit = ((CarbonInputSplit) inputSplit);
+      String splitPath = carbonInputSplit.getFilePath();
       // BlockFooterOffSet will be null in case of CarbonVectorizedReader as this has to be set
       // where multiple threads are able to read small set of files to calculate footer instead
       // of the main thread setting this for all the files.
-      if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+      if ((null != carbonInputSplit.getDetailInfo()
+          && carbonInputSplit.getDetailInfo().getBlockFooterOffset() == 0L) || (
+          null == carbonInputSplit.getDetailInfo() && carbonInputSplit.getStart() == 0)) {
         FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
             context.getConfiguration());
         ByteBuffer buffer = reader
             .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath), inputSplit.getLength() - 8,
                 8);
-        ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        if (carbonInputSplit.getDetailInfo() == null) {
+          carbonInputSplit.setStart(buffer.getLong());
+        } else {
+          carbonInputSplit.getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        }
+        reader.finish();
       }
       splitList.add((CarbonInputSplit) inputSplit);
     } else if (inputSplit instanceof CarbonMultiBlockSplit) {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 274c7ef..6c99142 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -566,7 +566,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
           for (InputSplit extendedBlocklet : extendedBlocklets) {
             CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
             blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(),
-                (long) blocklet.getDetailInfo().getRowCount());
+                (long) blocklet.getRowCount());
           }
         } catch (Exception e) {
           // Check if fallback is disabled then directly throw exception otherwise try driver
@@ -618,7 +618,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
             getDistributedSplit(table, null, partitions, filteredSegment,
                 allSegments.getInvalidSegments(), new ArrayList<String>()));
         for (InputSplit extendedBlocklet : extendedBlocklets) {
-          totalRowCount += ((CarbonInputSplit) extendedBlocklet).getDetailInfo().getRowCount();
+          totalRowCount += ((CarbonInputSplit) extendedBlocklet).getRowCount();
         }
       } else {
         TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 99db9d3..507f6d9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -81,15 +81,22 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       // Read the footer offset and set.
-      String splitPath = ((CarbonInputSplit) inputSplit).getFilePath();
-      if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+      CarbonInputSplit carbonInputSplit = ((CarbonInputSplit) inputSplit);
+      String splitPath = carbonInputSplit.getFilePath();
+      if ((null != carbonInputSplit.getDetailInfo()
+          && carbonInputSplit.getDetailInfo().getBlockFooterOffset() == 0L) || (
+          null == carbonInputSplit.getDetailInfo() && carbonInputSplit.getStart() == 0)) {
         FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
             taskAttemptContext.getConfiguration());
         ByteBuffer buffer = reader
             .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath),
-                ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() - 8,
+                ((CarbonInputSplit) inputSplit).getLength() - 8,
                 8);
-        ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        if (carbonInputSplit.getDetailInfo() == null) {
+          carbonInputSplit.setStart(buffer.getLong());
+        } else {
+          carbonInputSplit.getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        }
         reader.finish();
       }
       splitList = new ArrayList<>(1);
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 0be4970..a49b233 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -565,12 +565,12 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   override protected def afterAll(): Unit = {
-    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql("DROP TABLE IF EXISTS datamap_testFG")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
-        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+//    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+//    sql("DROP TABLE IF EXISTS normal_test")
+//    sql("DROP TABLE IF EXISTS datamap_test")
+//    sql("DROP TABLE IF EXISTS datamap_testFG")
+//    CarbonProperties.getInstance()
+//      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+//        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
   }
 }
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index 20a2d39..0f71f8c 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -52,8 +52,7 @@ public class Util {
    */
   public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> splitList) {
     for (CarbonInputSplit inputSplit : splitList) {
-      if (null == inputSplit.getDetailInfo() || null == inputSplit.getDetailInfo()
-          .getBlockletInfo()) {
+      if (inputSplit.isBlockCache()) {
         return true;
       }
     }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 656166d..8935b5b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -489,7 +489,7 @@ class CarbonMergerRDD[K, V](
         var dataFileFooter: DataFileFooter = null
         if (null == rangeColumn) {
           val taskNo = getTaskNo(split, partitionTaskMap, counter)
-          var sizeOfSplit = split.getDetailInfo.getBlockSize
+          var sizeOfSplit = split.getLength
           val splitList = taskIdMapping.get(taskNo)
           noOfBlocks += 1
           if (null == splitList) {
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 698dd58..4acfc33 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -17,6 +17,7 @@
 package org.apache.carbondata.indexserver
 
 import java.util
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 
@@ -26,12 +27,15 @@ import org.apache.spark.util.SizeEstimator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.BinaryExpression
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, LogicalFilterResolverImpl, RowLevelFilterResolverImpl}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime
 
 /**
@@ -47,24 +51,38 @@ class DistributedDataMapJob extends AbstractDataMapJob {
       val messageSize = SizeEstimator.estimate(dataMapFormat)
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
+    val queryId = SparkSQLUtil.getSparkSession.sparkContext.getConf
+      .get("queryId", UUID.randomUUID().toString)
+    dataMapFormat.setQueryId(queryId)
+    val tmpFolder = CarbonUtil
+      .createTempFolderForIndexServer(dataMapFormat.getCarbonTable.getTablePath, queryId)
     val (resonse, time) = logTime {
-      val spark = SparkSQLUtil.getSparkSession
-      val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
-        case null => ""
-        case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+      try {
+        val spark = SparkSQLUtil.getSparkSession
+        val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
+          case null => ""
+          case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+        }
+        val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
+          case null => ""
+          case _ => spark.sparkContext.getLocalProperty("spark.job.description")
+        }
+        dataMapFormat.setTaskGroupId(taskGroupId)
+        dataMapFormat.setTaskGroupDesc(taskGroupDesc)
+        var filterInf = dataMapFormat.getFilterResolverIntf
+        val filterProcessor = new FilterExpressionProcessor
+        filterInf = removeSparkUnknown(filterInf,
+          dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
+        dataMapFormat.setFilterResolverIntf(filterInf)
+        IndexServer.getClient.getSplits(dataMapFormat)
+          .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId)
+      } finally {
+        val tmpPath = CarbonUtil
+          .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath, queryId)
+        if (null != tmpFolder && !tmpFolder.deleteFile(tmpPath, FileFactory.getFileType(tmpPath))) {
+          LOGGER.error("Problem while deleting the temp directory:" + tmpPath)
+        }
       }
-      val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
-        case null => ""
-        case _ => spark.sparkContext.getLocalProperty("spark.job.description")
-      }
-      dataMapFormat.setTaskGroupId(taskGroupId)
-      dataMapFormat.setTaskGroupDesc(taskGroupDesc)
-      var filterInf = dataMapFormat.getFilterResolverIntf
-      val filterProcessor = new FilterExpressionProcessor
-      filterInf = removeSparkUnknown(filterInf,
-        dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
-      dataMapFormat.setFilterResolverIntf(filterInf)
-      IndexServer.getClient.getSplits(dataMapFormat).toList.asJava
     }
     LOGGER.info(s"Time taken to get response from server: $time ms")
     resonse
@@ -108,17 +126,12 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
     val spark = SparkSQLUtil.getSparkSession
-    val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
-      case null => ""
-      case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
-    }
-    val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
-      case null => ""
-      case _ => spark.sparkContext.getLocalProperty("spark.job.description")
-    }
-    dataMapFormat.setTaskGroupId(taskGroupId)
-    dataMapFormat.setTaskGroupDesc(taskGroupDesc)
-    IndexServer.getSplits(dataMapFormat).toList.asJava
+    val queryId = spark.sparkContext.getConf.get("queryId", UUID.randomUUID().toString)
+    dataMapFormat.setQueryId(queryId)
+    dataMapFormat.setIsWriteToFile(false)
+    dataMapFormat.setFallbackJob()
+    IndexServer.getSplits(dataMapFormat)
+      .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId)
   }
 
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index 607f923..2598e43 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -19,98 +19,154 @@ package org.apache.carbondata.indexserver
 
 import java.text.SimpleDateFormat
 import java.util.Date
+import java.util.concurrent.Executors
 
 import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
+import scala.concurrent.duration.Duration
 
-import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
 import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.DistributableDataMapFormat
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int,
+    idx: Int,
+    val inputSplit: Seq[InputSplit],
+    location: Array[String])
   extends Partition {
 
   override def index: Int = idx
 
   override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+  def getLocations: Array[String] = {
+    location
+  }
 }
 
 private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkSession,
     dataMapFormat: DistributableDataMapFormat)
-  extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
+  extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) {
 
   @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
     .getName)
-
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
+  var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
 
-  override protected def getPreferredLocations(split: Partition): Seq[String] = {
-    if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != null) {
-      split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
-    } else {
-      Seq()
-    }
+  private def groupSplits(xs: Seq[InputSplit], n: Int) = {
+    val (quot, rem) = (xs.size / n, xs.size % n)
+    val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1))
+    (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList
   }
 
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[(String, ExtendedBlocklet)] = {
+      context: TaskContext): Iterator[(String, ExtendedBlockletWrapper)] = {
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
-    val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
-    val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
-    reader.initialize(inputSplit, attemptContext)
-    val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
-      CacheProvider.getInstance().getCarbonCache.getCurrentSize
+    val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
+    if (dataMapFormat.isJobToClearDataMaps) {
+      // if job is to clear datamaps just clear datamaps from cache and pass empty iterator
+      DataMapStoreManager.getInstance().clearInvalidDataMaps(dataMapFormat.getCarbonTable,
+        inputSplits.map(_
+          .asInstanceOf[DataMapDistributableWrapper].getDistributable.getSegment.getSegmentNo)
+          .toList.asJava,
+        dataMapFormat
+          .getDataMapToClear)
+      val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+        CacheProvider.getInstance().getCarbonCache.getCurrentSize
+      } else {
+        0L
+      }
+      val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+        SparkEnv.get.blockManager.blockManagerId.executorId
+      }"
+      Iterator((executorIP + "_" + cacheSize, new ExtendedBlockletWrapper()))
     } else {
-      0L
-    }
-    context.addTaskCompletionListener(_ => {
-      if (reader != null) {
-        reader.close()
+      if (dataMapFormat.getInvalidSegments.size > 0) {
+        // clear the segmentMap and from cache in executor when there are invalid segments
+        DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
+          dataMapFormat.getInvalidSegments)
       }
-    })
-    val iter: Iterator[(String, ExtendedBlocklet)] = new Iterator[(String, ExtendedBlocklet)] {
+      val startTime = System.currentTimeMillis()
+      val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning
 
-      private var havePair = false
-      private var finished = false
+      val service = Executors
+        .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true))
+      implicit val ec: ExecutionContextExecutor = ExecutionContext
+        .fromExecutor(service)
 
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
+      val futures = if (inputSplits.length <= numOfThreads) {
+        inputSplits.map {
+          split => generateFuture(Seq(split), attemptContext)
         }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          havePair = !finished
+      } else {
+        groupSplits(inputSplits, numOfThreads).map {
+          splits => generateFuture(splits, attemptContext)
         }
-        !finished
       }
+      // scalastyle:off
+      val f = Await.result(Future.sequence(futures), Duration.Inf).flatten
+      // scalastyle:on
+      service.shutdownNow()
+      val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName)
+      LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " +
+                  (System.currentTimeMillis() - startTime))
+      val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+        CacheProvider.getInstance().getCarbonCache.getCurrentSize
+      } else {
+        0L
+      }
+      val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+        SparkEnv.get.blockManager.blockManagerId.executorId
+      }"
+      val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList
+        .asJava, dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId,
+        dataMapFormat.isWriteToFile))
+      Iterator(value)
+    }
+  }
 
-      override def next(): (String, ExtendedBlocklet) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
+  private def generateFuture(split: Seq[InputSplit],
+      attemptContextImpl: TaskAttemptContextImpl)
+    (implicit executionContext: ExecutionContext) = {
+    Future {
+      split.flatMap { inputSplit =>
+        val blocklets = new java.util.ArrayList[ExtendedBlocklet]()
+        val reader = dataMapFormat.createRecordReader(inputSplit, attemptContextImpl)
+        reader.initialize(inputSplit, attemptContextImpl)
+        while (reader.nextKeyValue()) {
+          blocklets.add(reader.getCurrentValue)
         }
-        havePair = false
-        val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
-          SparkEnv.get.blockManager.blockManagerId.executorId}"
-        val value = (executorIP + "_" + cacheSize.toString, reader.getCurrentValue)
-        value
+        reader.close()
+        blocklets.asScala
       }
     }
-    iter
+  }
+
+  override protected def getPreferredLocations(split: Partition): Seq[String] = {
+    if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
+      split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
+    } else {
+      Seq()
+    }
   }
 
   override protected def internalGetPartitions: Array[Partition] = {
@@ -121,7 +177,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
         dataMapFormat.getCarbonTable.getTableName)
     if (!isDistributedPruningEnabled || dataMapFormat.isFallbackJob || splits.isEmpty) {
       splits.zipWithIndex.map {
-        f => new DataMapRDDPartition(id, f._2, f._1)
+        f => new DataMapRDDPartition(id, f._2, List(f._1), f._1.getLocations)
       }.toArray
     } else {
       val executorsList: Map[String, Seq[String]] = DistributionUtil
@@ -130,7 +186,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
         DistributedRDDUtils.getExecutors(splits.toArray, executorsList, dataMapFormat
           .getCarbonTable.getTableUniqueName, id)
       }
-      LOGGER.debug(s"Time taken to assign executors to ${splits.length} is $time ms")
+      LOGGER.debug(s"Time taken to assign executors to ${ splits.length } is $time ms")
       response.toArray
     }
   }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index c7632be..f31110b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -64,11 +64,18 @@ object DistributedRDDUtils {
     if (invalidExecutorIds.nonEmpty) {
       DistributedRDDUtils.invalidateExecutors(invalidExecutorIds.toSeq)
     }
-    (convertToPartition(legacySegments, tableUniqueName, executorsList) ++
-     convertToPartition(sortedPartitions, tableUniqueName, executorsList)).zipWithIndex.map {
-      case (dataMapDistributable, index) =>
-        new DataMapRDDPartition(rddId, index, dataMapDistributable)
-    }
+    val groupedPartitions = (convertToPartition(legacySegments, tableUniqueName, executorsList) ++
+                             convertToPartition(sortedPartitions, tableUniqueName, executorsList))
+      .groupBy {
+        partition =>
+          partition.getLocations.head
+      }
+    groupedPartitions.zipWithIndex.map {
+      case ((location, splitList), index) =>
+        new DataMapRDDPartition(rddId,
+          index, splitList,
+          Array(location))
+    }.toArray.sortBy(_.index)
   }
 
   private def convertToPartition(segments: Seq[InputSplit], tableUniqueName: String,
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
index 0069d86..172ea47 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -37,7 +37,7 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName
         // create a dummy split for each executor to accumulate the cache size.
         val dummySplit = new CarbonInputSplit()
         dummySplit.setLocation(Array(executor))
-        new DataMapRDDPartition(id, idx, dummySplit)
+        new DataMapRDDPartition(id, idx, List(dummySplit), Array(executor))
     }
   }
 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 9eee6d7..295ebe1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DistributableDataMapFormat
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapperContainer}
 import org.apache.carbondata.core.util.CarbonProperties
 
 @ProtocolInfo(protocolName = "Server", protocolVersion = 1)
@@ -42,7 +42,7 @@ trait ServerInterface {
   /**
    * Used to prune and cache the datamaps for the table.
    */
-  def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
+  def getSplits(request: DistributableDataMapFormat): ExtendedBlockletWrapperContainer
 
   /**
    * Get the cache size for the specified table.
@@ -99,15 +99,17 @@ object IndexServer extends ServerInterface {
     })
   }
 
-  def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs {
+  def getSplits(request: DistributableDataMapFormat): ExtendedBlockletWrapperContainer = doAs {
     sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
     sparkSession.sparkContext.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
     val splits = new DistributedPruneRDD(sparkSession, request).collect()
-    DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+    if (!request.isFallbackJob) {
+      DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+    }
     if (request.isJobToClearDataMaps) {
       DistributedRDDUtils.invalidateCache(request.getCarbonTable.getTableUniqueName)
     }
-    splits.map(_._2)
+    new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob)
   }
 
   override def invalidateSegmentCache(databaseName: String, tableName: String,
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index bc83d2f..0c2f877 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -51,7 +51,7 @@ class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, databas
           // create a dummy split for each executor to accumulate the cache size.
           val dummySplit = new CarbonInputSplit()
           dummySplit.setLocation(Array(executor))
-          new DataMapRDDPartition(id, idx, dummySplit)
+          new DataMapRDDPartition(id, idx, List(dummySplit), Array(executor))
       }
     }
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 8cf477e..10c4bcb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -31,9 +31,9 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -147,7 +148,6 @@ public class CarbonCompactionUtil {
    */
   public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
       List<TableBlockInfo> tableBlockInfoList, boolean isSortedTable) throws IOException {
-
     Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
     for (TableBlockInfo blockInfo : tableBlockInfoList) {
       List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
@@ -159,11 +159,16 @@ public class CarbonCompactionUtil {
       // in getting the schema last updated time based on which compaction flow is decided that
       // whether it will go to restructure compaction flow or normal compaction flow.
       // This decision will impact the compaction performance so it needs to be decided carefully
-      final BlockletInfo blockletInfo = blockInfo.getDetailInfo().getBlockletInfo();
-      if (null != blockInfo.getDetailInfo() && (
-          blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L || null == blockletInfo
-              || null == blockletInfo.isSorted() || !blockletInfo.isSorted())) {
+      BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
+      if (null == blockletDetailInfo || blockletDetailInfo.getBlockletInfo() == null ||
+          blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L
+              || null == blockletDetailInfo.getBlockletInfo().isSorted() || !blockletDetailInfo
+              .getBlockletInfo().isSorted()) {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo, true);
+        if (blockletDetailInfo == null) {
+          blockletDetailInfo = QueryUtil.getBlockletDetailInfo(dataFileMatadata, blockInfo);
+          blockInfo.setDetailInfo(blockletDetailInfo);
+        }
         if (null == dataFileMatadata.isSorted()) {
           dataFileMatadata.setSorted(isSortedTable);
         }
@@ -181,7 +186,6 @@ public class CarbonCompactionUtil {
       }
     }
     return segmentBlockInfoMapping;
-
   }
 
   /**
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index c0e4e27..0c08eab 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.channels.Channels;
 import java.util.TimeZone;
 
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
 import org.apache.carbondata.sdk.file.Schema;
 
 import org.apache.arrow.memory.BufferAllocator;