You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/04/30 09:47:54 UTC

carbondata git commit: Compaction performance is slow as compared to data load

Repository: carbondata
Updated Branches:
  refs/heads/master 03a735bf7 -> 26607fb9c


Compaction performance is slow as compared to data load

During compaction result filling is done in row format. Due to this as the number of columns increases the dimension and measure data filling time increases. This happens because in row filling we are not able to take advantage of OS cacheable buffers as we continuously read data for next column.Implement a columnar format filling data structure for compaction process for filling dimension and measure data

This closes #2210


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/26607fb9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/26607fb9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/26607fb9

Branch: refs/heads/master
Commit: 26607fb9c5507faa92bafec33f0fabab2e47935d
Parents: 03a735b
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Apr 23 12:36:39 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Mon Apr 30 15:16:43 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +-
 .../impl/AbstractScannedResultCollector.java    |  49 ++++++
 .../collector/impl/RawBasedResultCollector.java | 131 ++++++++++++----
 .../RestructureBasedRawResultCollector.java     | 153 ++++++++++---------
 .../executor/impl/AbstractQueryExecutor.java    |   9 +-
 .../scan/executor/infos/BlockExecutionInfo.java |  14 ++
 .../core/scan/model/QueryModelBuilder.java      |   3 +-
 .../core/scan/processor/BlockScan.java          |   1 +
 .../core/scan/processor/DataBlockIterator.java  |   1 +
 .../core/scan/result/BlockletScannedResult.java | 129 +++++++++++++++-
 .../result/impl/FilterQueryScannedResult.java   |  20 ++-
 .../impl/NonFilterQueryScannedResult.java       |  68 ++++++++-
 .../AbstractDetailQueryResultIterator.java      |  21 +++
 .../AbstractSearchModeResultIterator.java       |  21 +++
 .../scanner/impl/BlockletFilterScanner.java     |   3 +-
 .../scan/scanner/impl/BlockletFullScanner.java  |   5 +-
 .../core/scan/wrappers/ByteArrayWrapper.java    |  19 +++
 .../core/stats/QueryStatisticsConstants.java    |  22 +++
 .../carbondata/core/stats/TaskStatistics.java   |   6 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   5 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +
 .../merger/CarbonCompactionExecutor.java        |  39 ++++-
 22 files changed, 602 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c4b0507..b902359 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1568,10 +1568,12 @@ public final class CarbonCommonConstants {
    * memory.
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION =
       "carbon.enable.page.level.reader.in.compaction";
 
-  public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "true";
+  // Note: If this property is set to true it can impact compaction performance as IO will increase
+  public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT = "false";
 
   @CarbonProperty
   public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 9ac5a06..a160778 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.collector.impl;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.util.List;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -30,6 +31,7 @@ import org.apache.carbondata.core.scan.executor.infos.MeasureInfo;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -52,10 +54,17 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
    */
   DimensionInfo dimensionInfo;
 
+  /**
+   * model object to be used for collecting query statistics during normal query execution,
+   * compaction and other flows that uses the query flow
+   */
+  QueryStatisticsModel queryStatisticsModel;
+
   AbstractScannedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     this.executionInfo = blockExecutionInfos;
     measureInfo = blockExecutionInfos.getMeasureInfo();
     dimensionInfo = blockExecutionInfos.getDimensionInfo();
+    this.queryStatisticsModel = blockExecutionInfos.getQueryStatisticsModel();
   }
 
   protected void fillMeasureData(Object[] msrValues, int offset,
@@ -83,6 +92,46 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
     }
   }
 
+  /**
+   * This method will be used to fill measure data column wise
+   *
+   * @param rows
+   * @param offset
+   * @param scannedResult
+   */
+  protected void fillMeasureDataBatch(List<Object[]> rows, int offset,
+      BlockletScannedResult scannedResult) {
+    int measureExistIndex = 0;
+    for (short i = 0; i < measureInfo.getMeasureDataTypes().length; i++) {
+      // if measure exists is block then pass measure column
+      // data chunk to the collector
+      if (measureInfo.getMeasureExists()[i]) {
+        ProjectionMeasure queryMeasure = executionInfo.getProjectionMeasures()[measureExistIndex];
+        ColumnPage measureChunk =
+            scannedResult.getMeasureChunk(measureInfo.getMeasureOrdinals()[measureExistIndex]);
+        for (short j = 0; j < rows.size(); j++) {
+          Object[] rowValues = rows.get(j);
+          rowValues[i + offset] =
+              getMeasureData(measureChunk, scannedResult.getValidRowIds().get(j),
+                  queryMeasure.getMeasure());
+        }
+        measureExistIndex++;
+      } else {
+        // if not then get the default value and use that value in aggregation
+        Object defaultValue = measureInfo.getDefaultValues()[i];
+        if (null != defaultValue && DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) {
+          // convert data type as per the computing engine
+          defaultValue =
+              DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(defaultValue);
+        }
+        for (short j = 0; j < rows.size(); j++) {
+          Object[] rowValues = rows.get(j);
+          rowValues[i + offset] = defaultValue;
+        }
+      }
+    }
+  }
+
   Object getMeasureData(ColumnPage dataChunk, int index, CarbonMeasure carbonMeasure) {
     if (!dataChunk.getNullBits().get(index)) {
       DataType dataType = carbonMeasure.getDataType();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 0780675..d28df0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -16,25 +16,23 @@
  */
 package org.apache.carbondata.core.scan.collector.impl;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 
 /**
  * It is not a collector it is just a scanned result holder.
  */
 public class RawBasedResultCollector extends AbstractScannedResultCollector {
 
-  byte[] dictionaryKeyArray;
-
-  byte[][] noDictionaryKeyArray;
-
-  private byte[][] complexTypeKeyArray;
-
   public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
   }
@@ -44,37 +42,114 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector {
    * it will keep track of how many record is processed, to handle limit scenario
    */
   @Override
-  public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, int batchSize) {
+  public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult,
+      int batchSize) {
+    long startTime = System.currentTimeMillis();
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     ProjectionMeasure[] queryMeasures = executionInfo.getProjectionMeasures();
     // scan the record and add to list
-    int rowCounter = 0;
-    while (scannedResult.hasNext() && rowCounter < batchSize) {
-      scanResultAndGetData(scannedResult);
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+    scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures);
+    QueryStatistic resultPrepTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.RESULT_PREP_TIME);
+    resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME,
+        resultPrepTime.getCount() + (System.currentTimeMillis() - startTime));
+    return listBasedResult;
+  }
+
+  /**
+   * This method will scan and fill dimension and measure data
+   *
+   * @param scannedResult
+   * @param batchSize
+   * @param listBasedResult
+   * @param queryMeasures
+   */
+  protected void scanAndFillData(BlockletScannedResult scannedResult, int batchSize,
+      List<Object[]> listBasedResult, ProjectionMeasure[] queryMeasures) {
+    int numberOfPages = scannedResult.numberOfpages();
+    // loop will exit once the batchSize data has been read or the pages have been exhausted
+    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+      if (currentPageRowCount == 0) {
+        scannedResult.incrementPageCounter();
         continue;
       }
-      prepareRow(scannedResult, listBasedResult, queryMeasures);
-      rowCounter++;
+      int rowCounter = scannedResult.getRowCounter();
+      // getRowCounter holds total number rows processed. Calculate the
+      // Left over space through getRowCounter only.
+      int availableRows = currentPageRowCount - rowCounter;
+      // rows available in current page that can be processed from current page
+      int availableBatchRowCount = Math.min(batchSize, availableRows);
+      // this condition will be true if no data left in the current block/blocklet to be scanned
+      if (availableBatchRowCount < 1) {
+        break;
+      }
+      if (batchSize > availableRows) {
+        batchSize = batchSize - availableRows;
+      } else {
+        // this is done because in IUD cases actuals rows fetch can be less than batch size as
+        // some of the rows could have deleted. So in those cases batchSize need to be
+        // re initialized with left over value
+        batchSize = 0;
+      }
+      // fill dimension data
+      fillDimensionData(scannedResult, listBasedResult, queryMeasures, availableBatchRowCount);
+      fillMeasureData(scannedResult, listBasedResult);
+      // increment the number of rows scanned in scanned result statistics
+      incrementScannedResultRowCounter(scannedResult, availableBatchRowCount);
+      // assign the left over rows to batch size if the number of rows fetched are lesser
+      // than batchSize
+      if (listBasedResult.size() < availableBatchRowCount) {
+        batchSize += availableBatchRowCount - listBasedResult.size();
+      }
     }
-    return listBasedResult;
   }
 
-  void prepareRow(BlockletScannedResult scannedResult, List<Object[]> listBasedResult,
-      ProjectionMeasure[] queryMeasures) {
-    Object[] row = new Object[1 + queryMeasures.length];
-    ByteArrayWrapper wrapper = new ByteArrayWrapper();
-    wrapper.setDictionaryKey(dictionaryKeyArray);
-    wrapper.setNoDictionaryKeys(noDictionaryKeyArray);
-    wrapper.setComplexTypesKeys(complexTypeKeyArray);
-    row[0] = wrapper;
-    fillMeasureData(row, 1, scannedResult);
-    listBasedResult.add(row);
+  private void fillDimensionData(BlockletScannedResult scannedResult,
+      List<Object[]> listBasedResult, ProjectionMeasure[] queryMeasures, int batchSize) {
+    long startTime = System.currentTimeMillis();
+    List<byte[]> dictionaryKeyArrayBatch = scannedResult.getDictionaryKeyArrayBatch(batchSize);
+    List<byte[][]> noDictionaryKeyArrayBatch =
+        scannedResult.getNoDictionaryKeyArrayBatch(batchSize);
+    List<byte[][]> complexTypeKeyArrayBatch = scannedResult.getComplexTypeKeyArrayBatch(batchSize);
+    // it will same for one blocklet so can be computed only once
+    byte[] implicitColumnByteArray = scannedResult.getBlockletId()
+        .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    // Note: size check in for loop is for dictionaryKeyArrayBatch as this size can be lesser than
+    // batch size in case of IUD scenarios
+    for (int i = 0; i < dictionaryKeyArrayBatch.size(); i++) {
+      // 1 for ByteArrayWrapper object which will contain dictionary and no dictionary data
+      Object[] row = new Object[1 + queryMeasures.length];
+      ByteArrayWrapper wrapper = new ByteArrayWrapper();
+      wrapper.setDictionaryKey(dictionaryKeyArrayBatch.get(i));
+      wrapper.setNoDictionaryKeys(noDictionaryKeyArrayBatch.get(i));
+      wrapper.setComplexTypesKeys(complexTypeKeyArrayBatch.get(i));
+      wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
+      row[0] = wrapper;
+      listBasedResult.add(row);
+    }
+    QueryStatistic keyColumnFillingTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME);
+    keyColumnFillingTime.addCountStatistic(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME,
+        keyColumnFillingTime.getCount() + (System.currentTimeMillis() - startTime));
+  }
+
+  private void fillMeasureData(BlockletScannedResult scannedResult,
+      List<Object[]> listBasedResult) {
+    long startTime = System.currentTimeMillis();
+    // if list is not empty after filling the dimension data then only fill the measure data
+    if (!listBasedResult.isEmpty()) {
+      fillMeasureDataBatch(listBasedResult, 1, scannedResult);
+    }
+    QueryStatistic measureFillingTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.MEASURE_FILLING_TIME);
+    measureFillingTime.addCountStatistic(QueryStatisticsConstants.MEASURE_FILLING_TIME,
+        measureFillingTime.getCount() + (System.currentTimeMillis() - startTime));
   }
 
-  void scanResultAndGetData(BlockletScannedResult scannedResult) {
-    dictionaryKeyArray = scannedResult.getDictionaryKeyArray();
-    noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray();
-    complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+  private void incrementScannedResultRowCounter(BlockletScannedResult scannedResult,
+      int batchSize) {
+    // increment row counter by batch size as those many number of rows have been processed at once
+    scannedResult.incrementCounter(batchSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index d776b5e..1c440cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -33,6 +33,9 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -151,106 +154,112 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
    */
   @Override
   public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, int batchSize) {
+    long startTime = System.currentTimeMillis();
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     ProjectionMeasure[] queryMeasures = executionInfo.getActualQueryMeasures();
     // scan the record and add to list
-    int rowCounter = 0;
-    while (scannedResult.hasNext() && rowCounter < batchSize) {
-      scanResultAndGetData(scannedResult);
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
-        continue;
-      }
-      // re-fill dictionary and no dictionary key arrays for the newly added columns
-      if (dimensionInfo.isDictionaryColumnAdded()) {
-        dictionaryKeyArray = fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray);
-      }
-      if (dimensionInfo.isNoDictionaryColumnAdded()) {
-        noDictionaryKeyArray = fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray);
-      }
-      prepareRow(scannedResult, listBasedResult, queryMeasures);
-      rowCounter++;
+    scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures);
+    // re-fill dictionary and no dictionary key arrays for the newly added columns
+    if (dimensionInfo.isDictionaryColumnAdded()) {
+      fillDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
+    }
+    if (dimensionInfo.isNoDictionaryColumnAdded()) {
+      fillNoDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
     }
+    QueryStatistic resultPrepTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.RESULT_PREP_TIME);
+    resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME,
+        resultPrepTime.getCount() + (System.currentTimeMillis() - startTime));
     return listBasedResult;
   }
 
   /**
    * This method will fill the dictionary key array with newly added dictionary columns if any
    *
-   * @param dictionaryKeyArray
+   * @param rows
    * @return
    */
-  private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) {
-    ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
-    int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount();
-    long[] keyArray = null;
-    if (null != updatedCurrentBlockKeyGenerator) {
-      keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray);
-      newKeyArrayLength += keyArray.length;
-    }
-    long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength];
-    int existingColumnKeyArrayIndex = 0;
-    int newKeyArrayIndex = 0;
-    for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
-      if (CarbonUtil
-          .hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), Encoding.DICTIONARY)) {
-        // if dimension exists then add the key array value else add the default value
-        if (dimensionInfo.getDimensionExists()[i]) {
-          keyArrayWithNewAddedColumns[newKeyArrayIndex++] = keyArray[existingColumnKeyArrayIndex++];
-        } else {
-          long defaultValueAsLong;
-          Object defaultValue = dimensionInfo.getDefaultValues()[i];
-          if (null != defaultValue) {
-            defaultValueAsLong = ((Integer) defaultValue).longValue();
+  private void fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
+    for (Object[] row : rows) {
+      ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
+      byte[] dictKeyArray = byteArrayWrapper.getDictionaryKey();
+      ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
+      int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount();
+      long[] keyArray = null;
+      if (null != updatedCurrentBlockKeyGenerator) {
+        keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictKeyArray);
+        newKeyArrayLength += keyArray.length;
+      }
+      long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength];
+      int existingColumnKeyArrayIndex = 0;
+      int newKeyArrayIndex = 0;
+      for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
+        if (CarbonUtil.hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(),
+            Encoding.DICTIONARY)) {
+          // if dimension exists then add the key array value else add the default value
+          if (dimensionInfo.getDimensionExists()[i]) {
+            keyArrayWithNewAddedColumns[newKeyArrayIndex++] =
+                keyArray[existingColumnKeyArrayIndex++];
           } else {
-            defaultValueAsLong = (long)CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+            long defaultValueAsLong;
+            Object defaultValue = dimensionInfo.getDefaultValues()[i];
+            if (null != defaultValue) {
+              defaultValueAsLong = ((Integer) defaultValue).longValue();
+            } else {
+              defaultValueAsLong = (long) CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+            }
+            keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong;
           }
-          keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong;
         }
       }
+      try {
+        dictKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns);
+        byteArrayWrapper.setDictionaryKey(dictKeyArray);
+      } catch (KeyGenException e) {
+        throw new RuntimeException(e);
+      }
     }
-    try {
-      dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns);
-    } catch (KeyGenException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return dictionaryKeyArray;
   }
 
   /**
    * This method will fill the no dictionary byte array with newly added no dictionary columns
    *
-   * @param noDictionaryKeyArray
+   * @param rows
    * @return
    */
-  private byte[][] fillNoDictionaryKeyArrayWithLatestSchema(byte[][] noDictionaryKeyArray) {
-    ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
-    byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
-        new byte[noDictionaryKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
-    int existingColumnValueIndex = 0;
-    int newKeyArrayIndex = 0;
-    for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
-      if (!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)
-          && !actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
-        // if dimension exists then add the byte array value else add the default value
-        if (dimensionInfo.getDimensionExists()[i]) {
-          noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-              noDictionaryKeyArray[existingColumnValueIndex++];
-        } else {
-          byte[] newColumnDefaultValue = null;
-          Object defaultValue = dimensionInfo.getDefaultValues()[i];
-          if (null != defaultValue) {
-            newColumnDefaultValue = (byte[]) defaultValue;
-          } else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
-            newColumnDefaultValue =
-                DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
-                    CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
+    for (Object[] row : rows) {
+      ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
+      byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
+      byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
+          new byte[noDictKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
+      int existingColumnValueIndex = 0;
+      int newKeyArrayIndex = 0;
+      for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
+        if (!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)
+            && !actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          // if dimension exists then add the byte array value else add the default value
+          if (dimensionInfo.getDimensionExists()[i]) {
+            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                noDictKeyArray[existingColumnValueIndex++];
           } else {
-            newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+            byte[] newColumnDefaultValue = null;
+            Object defaultValue = dimensionInfo.getDefaultValues()[i];
+            if (null != defaultValue) {
+              newColumnDefaultValue = (byte[]) defaultValue;
+            } else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
+              newColumnDefaultValue =
+                  DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
+                      CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else {
+              newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+            }
+            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = newColumnDefaultValue;
           }
-          noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = newColumnDefaultValue;
         }
       }
+      byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
     }
-    return noDictionaryKeyArrayWithNewlyAddedColumns;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index bc40be8..ed0350f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -116,9 +116,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         .getCarbonTableIdentifier().getTableName());
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
-    queryProperties.queryStatisticsRecorder =
-        CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
-    queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+    queryProperties.queryStatisticsRecorder = queryModel.getStatisticsRecorder();
+    if (null == queryProperties.queryStatisticsRecorder) {
+      queryProperties.queryStatisticsRecorder =
+          CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
+      queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+    }
     QueryStatistic queryStatistic = new QueryStatistic();
     // sort the block info
     // so block will be loaded in sorted order this will be required for

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index 6633195..3165bfd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
 /**
  * Below class will have all the properties which needed during query execution
@@ -216,6 +217,11 @@ public class BlockExecutionInfo {
   private boolean requiredRowId;
 
   /**
+   * model for collecting query stats
+   */
+  private QueryStatisticsModel queryStatisticsModel;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -631,4 +637,12 @@ public class BlockExecutionInfo {
   public void setRequiredRowId(boolean requiredRowId) {
     this.requiredRowId = requiredRowId;
   }
+
+  public QueryStatisticsModel getQueryStatisticsModel() {
+    return queryStatisticsModel;
+  }
+
+  public void setQueryStatisticsModel(QueryStatisticsModel queryStatisticsModel) {
+    this.queryStatisticsModel = queryStatisticsModel;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index f7b828e..31f5d0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -96,9 +96,8 @@ public class QueryModelBuilder {
     return this;
   }
 
-  public QueryModelBuilder enableReadPageByPage() {
+  public void enableReadPageByPage() {
     this.readPageByPage = true;
-    return this;
   }
 
   public QueryModel build() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
index 4d46b3b..88f1503 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java
@@ -45,6 +45,7 @@ public class BlockScan {
   public BlockScan(BlockExecutionInfo blockExecutionInfo, FileReader fileReader,
       QueryStatisticsModel queryStatisticsModel) {
     this.blockExecutionInfo = blockExecutionInfo;
+    this.blockExecutionInfo.setQueryStatisticsModel(queryStatisticsModel);
     this.fileReader = fileReader;
     this.blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
         blockExecutionInfo.getNumberOfBlockToScan());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
index fde4e55..1eeb579 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -85,6 +85,7 @@ public class DataBlockIterator extends CarbonIterator<List<Object[]>> {
   public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader fileReader,
       int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) {
     this.blockExecutionInfo = blockExecutionInfo;
+    this.blockExecutionInfo.setQueryStatisticsModel(queryStatisticsModel);
     this.fileReader = fileReader;
     blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
         blockExecutionInfo.getNumberOfBlockToScan());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 33caa98..35d4f51 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -21,6 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -39,6 +41,9 @@ import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -62,7 +67,7 @@ public abstract class BlockletScannedResult {
   /**
    * key size of the fixed length column
    */
-  private int fixedLengthKeySize;
+  protected int fixedLengthKeySize;
   /**
    * total number of filtered rows for each page
    */
@@ -142,7 +147,12 @@ public abstract class BlockletScannedResult {
    */
   private String blockletNumber;
 
-  public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo) {
+  protected List<Integer> validRowIds;
+
+  protected QueryStatisticsModel queryStatisticsModel;
+
+  public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
     this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
     this.noDictionaryColumnChunkIndexes = blockExecutionInfo.getNoDictionaryColumnChunkIndexes();
     this.dictionaryColumnChunkIndexes = blockExecutionInfo.getDictionaryColumnChunkIndex();
@@ -151,6 +161,8 @@ public abstract class BlockletScannedResult {
     this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes();
     this.totalDimensionsSize = blockExecutionInfo.getProjectionDimensions().length;
     this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
+    this.queryStatisticsModel = queryStatisticsModel;
+    validRowIds = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
 
   /**
@@ -324,6 +336,16 @@ public abstract class BlockletScannedResult {
   }
 
   /**
+   * This method will add the delta to row counter
+   *
+   * @param delta
+   */
+  public void incrementCounter(int delta) {
+    rowCounter += delta;
+    currentRow += delta;
+  }
+
+  /**
    * Just increment the page counter and reset the remaining counters.
    */
   public void incrementPageCounter() {
@@ -344,6 +366,7 @@ public abstract class BlockletScannedResult {
     if (pageCounter >= pageFilteredRowCount.length) {
       return;
     }
+    long startTime = System.currentTimeMillis();
     for (int i = 0; i < dimensionColumnPages.length; i++) {
       if (dimensionColumnPages[i][pageCounter] == null && dimRawColumnChunks[i] != null) {
         dimensionColumnPages[i][pageCounter] =
@@ -357,6 +380,10 @@ public abstract class BlockletScannedResult {
             msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter);
       }
     }
+    QueryStatistic pageUncompressTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
+    pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
+        pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
   }
 
   // free the memory for the last page chunk
@@ -373,6 +400,7 @@ public abstract class BlockletScannedResult {
         measureColumnPages[i][pageCounter - 1] = null;
       }
     }
+    clearValidRowIdList();
   }
 
   public int numberOfpages() {
@@ -417,6 +445,75 @@ public abstract class BlockletScannedResult {
   }
 
   /**
+   * This method will return the bitsets for valid row Id's to be scanned
+   *
+   * @param rowId
+   * @param batchSize
+   * @return
+   */
+  protected void fillValidRowIdsBatchFilling(int rowId, int batchSize) {
+    // row id will be different for every batch so clear it before filling
+    clearValidRowIdList();
+    int startPosition = rowId;
+    for (int i = 0; i < batchSize; i++) {
+      if (!containsDeletedRow(startPosition)) {
+        validRowIds.add(startPosition);
+      }
+      startPosition++;
+    }
+  }
+
+  private void clearValidRowIdList() {
+    if (null != validRowIds && !validRowIds.isEmpty()) {
+      validRowIds.clear();
+    }
+  }
+
+  public List<Integer> getValidRowIds() {
+    return validRowIds;
+  }
+
+  /**
+   * Below method will be used to get the complex type keys array based
+   * on row id for all the complex type dimension selected in query.
+   * This method will be used to fill the data column wise
+   *
+   * @return complex type key array for all the complex dimension selected in query
+   */
+  protected List<byte[][]> getComplexTypeKeyArrayBatch() {
+    List<byte[][]> complexTypeArrayList = new ArrayList<>(validRowIds.size());
+    byte[][] complexTypeData = null;
+    // everyTime it is initialized new as in case of prefetch it can modify the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      complexTypeData = new byte[complexParentBlockIndexes.length][];
+      complexTypeArrayList.add(complexTypeData);
+    }
+    for (int i = 0; i < complexParentBlockIndexes.length; i++) {
+      // get the genericQueryType for 1st column
+      GenericQueryType genericQueryType =
+          complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
+      for (int j = 0; j < validRowIds.size(); j++) {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutput = new DataOutputStream(byteStream);
+        try {
+          genericQueryType
+              .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, validRowIds.get(j),
+                  pageCounter, dataOutput);
+          // get the key array in columnar way
+          byte[][] complexKeyArray = complexTypeArrayList.get(j);
+          complexKeyArray[i] = byteStream.toByteArray();
+        } catch (IOException e) {
+          LOGGER.error(e);
+        } finally {
+          CarbonUtil.closeStreams(dataOutput);
+          CarbonUtil.closeStreams(byteStream);
+        }
+      }
+    }
+    return complexTypeArrayList;
+  }
+
+  /**
    * @return blockletId
    */
   public String getBlockletId() {
@@ -527,6 +624,8 @@ public abstract class BlockletScannedResult {
         }
       }
     }
+    clearValidRowIdList();
+    validRowIds = null;
   }
 
   /**
@@ -568,6 +667,14 @@ public abstract class BlockletScannedResult {
   public abstract int[] getDictionaryKeyIntegerArray();
 
   /**
+   * Method to fill each dictionary column data column wise
+   *
+   * @param batchSize
+   * @return
+   */
+  public abstract List<byte[]> getDictionaryKeyArrayBatch(int batchSize);
+
+  /**
    * Below method will be used to get the complex type key array
    *
    * @return complex type key array
@@ -575,6 +682,15 @@ public abstract class BlockletScannedResult {
   public abstract byte[][] getComplexTypeKeyArray();
 
   /**
+   * Below method will be used to get the complex type key array
+   * This method will fill the data column wise for the given batch size
+   *
+   * @param batchSize
+   * @return complex type key array
+   */
+  public abstract List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize);
+
+  /**
    * Below method will be used to get the no dictionary key
    * array for all the no dictionary dimension selected in query
    *
@@ -583,6 +699,15 @@ public abstract class BlockletScannedResult {
   public abstract byte[][] getNoDictionaryKeyArray();
 
   /**
+   * Below method will be used to get the dimension key array
+   * for all the no dictionary dimension present in the query
+   * This method will fill the data column wise for the given batch size
+   *
+   * @return no dictionary keys for all no dictionary dimension
+   */
+  public abstract List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize);
+
+  /**
    * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
    * @param columnarBatch
    * @param startRow

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index bcc5634..7de3e71 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -16,9 +16,12 @@
  */
 package org.apache.carbondata.core.scan.result.impl;
 
+import java.util.List;
+
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
 /**
  * Result provider class in case of filter query
@@ -27,8 +30,9 @@ import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
  */
 public class FilterQueryScannedResult extends BlockletScannedResult {
 
-  public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) {
-    super(tableBlockExecutionInfos);
+  public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(tableBlockExecutionInfos, queryStatisticsModel);
   }
 
   /**
@@ -49,6 +53,10 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
     return getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
+  @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * Below method will be used to get the complex type key array
    *
@@ -58,6 +66,10 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
     return getComplexTypeKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
+  @Override public List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * Below method will be used to get the no dictionary key
    * array for all the no dictionary dimension selected in query
@@ -68,6 +80,10 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
     return getNoDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
+  @Override public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * will return the current valid row id
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index 06687c2..5956595 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -16,8 +16,12 @@
  */
 package org.apache.carbondata.core.scan.result.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
 /**
  * Result provide class for non filter query
@@ -26,8 +30,9 @@ import org.apache.carbondata.core.scan.result.BlockletScannedResult;
  */
 public class NonFilterQueryScannedResult extends BlockletScannedResult {
 
-  public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) {
-    super(blockExecutionInfo);
+  public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo, queryStatisticsModel);
   }
 
   /**
@@ -48,6 +53,32 @@ public class NonFilterQueryScannedResult extends BlockletScannedResult {
     return getDictionaryKeyIntegerArray(currentRow);
   }
 
+  @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
+    // rowId from where computing need to start
+    int startRowId = currentRow + 1;
+    fillValidRowIdsBatchFilling(startRowId, batchSize);
+    List<byte[]> dictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
+    int[] columnDataOffsets = null;
+    byte[] completeKey = null;
+    // everyTime it is initialized new as in case of prefetch it can modify the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      completeKey = new byte[fixedLengthKeySize];
+      dictionaryKeyArrayList.add(completeKey);
+    }
+    // initialize offset array onli if data is present
+    if (this.dictionaryColumnChunkIndexes.length > 0) {
+      columnDataOffsets = new int[validRowIds.size()];
+    }
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      for (int j = 0; j < validRowIds.size(); j++) {
+        columnDataOffsets[j] += dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+            .fillRawData(validRowIds.get(j), columnDataOffsets[j], dictionaryKeyArrayList.get(j),
+                columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+      }
+    }
+    return dictionaryKeyArrayList;
+  }
+
   /**
    * Below method will be used to get the complex type key array
    *
@@ -57,6 +88,10 @@ public class NonFilterQueryScannedResult extends BlockletScannedResult {
     return getComplexTypeKeyArray(currentRow);
   }
 
+  @Override public List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize) {
+    return getComplexTypeKeyArrayBatch();
+  }
+
   /**
    * Below method will be used to get the no dictionary key array for all the
    * no dictionary dimension selected in query
@@ -68,6 +103,35 @@ public class NonFilterQueryScannedResult extends BlockletScannedResult {
   }
 
   /**
+   * Below method will be used to get the dimension key array
+   * for all the no dictionary dimension present in the query
+   * This method will fill the data column wise for the given batch size
+   *
+   * @return no dictionary keys for all no dictionary dimension
+   */
+  @Override public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
+    List<byte[][]> noDictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
+    byte[][] noDictionaryColumnsKeys = null;
+    // everyTime it is initialized new as in case of prefetch it can modify the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      noDictionaryColumnsKeys = new byte[noDictionaryColumnChunkIndexes.length][];
+      noDictionaryKeyArrayList.add(noDictionaryColumnsKeys);
+    }
+    int columnPosition = 0;
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      for (int j = 0; j < validRowIds.size(); j++) {
+        byte[][] noDictionaryArray = noDictionaryKeyArrayList.get(j);
+        noDictionaryArray[columnPosition] =
+            dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
+                .getChunkData(validRowIds.get(j));
+      }
+      columnPosition++;
+    }
+    return noDictionaryKeyArrayList;
+  }
+
+
+  /**
    * will return the current valid row id
    *
    * @return valid row id

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index bb23ff3..d975c20 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -294,6 +294,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
     queryStatisticsModel.getStatisticsTypeAndObjMap()
         .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime);
     queryStatisticsModel.getRecorder().recordStatistics(readTime);
+
+    // dimension filling time
+    QueryStatistic keyColumnFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, keyColumnFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(keyColumnFilingTime);
+    // measure filling time
+    QueryStatistic measureFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.MEASURE_FILLING_TIME, measureFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(measureFilingTime);
+    // page Io Time
+    QueryStatistic pageUncompressTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME, pageUncompressTime);
+    queryStatisticsModel.getRecorder().recordStatistics(pageUncompressTime);
+    // result preparation time
+    QueryStatistic resultPreparationTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.RESULT_PREP_TIME, resultPreparationTime);
+    queryStatisticsModel.getRecorder().recordStatistics(resultPreparationTime);
   }
 
   public void processNextBatch(CarbonColumnarBatch columnarBatch) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
index 1b52e55..c7f5c51 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java
@@ -113,6 +113,27 @@ public abstract class AbstractSearchModeResultIterator
     queryStatisticsModel.getStatisticsTypeAndObjMap()
         .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime);
     queryStatisticsModel.getRecorder().recordStatistics(readTime);
+
+    // dimension filling time
+    QueryStatistic keyColumnFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, keyColumnFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(keyColumnFilingTime);
+    // measure filling time
+    QueryStatistic measureFilingTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.MEASURE_FILLING_TIME, measureFilingTime);
+    queryStatisticsModel.getRecorder().recordStatistics(measureFilingTime);
+    // page Io Time
+    QueryStatistic pageUncompressTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME, pageUncompressTime);
+    queryStatisticsModel.getRecorder().recordStatistics(pageUncompressTime);
+    // result preparation time
+    QueryStatistic resultPreparationTime = new QueryStatistic();
+    queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .put(QueryStatisticsConstants.RESULT_PREP_TIME, resultPreparationTime);
+    queryStatisticsModel.getRecorder().recordStatistics(resultPreparationTime);
     return queryStatisticsModel;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index ec49c77..37df0e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -185,7 +185,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
       return createEmptyResult();
     }
 
-    BlockletScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+    BlockletScannedResult scannedResult =
+        new FilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
     scannedResult.setBlockletId(
         blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
             rawBlockletColumnChunks.getDataBlock().blockletIndex());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index 0cb4059..a48804c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -61,7 +61,8 @@ public class BlockletFullScanner implements BlockletScanner {
       RawBlockletColumnChunks rawBlockletColumnChunks)
       throws IOException, FilterUnsupportedException {
     long startTime = System.currentTimeMillis();
-    BlockletScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+    BlockletScannedResult scannedResult =
+        new NonFilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
     QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
@@ -163,7 +164,7 @@ public class BlockletFullScanner implements BlockletScanner {
 
   BlockletScannedResult createEmptyResult() {
     if (emptyResult == null) {
-      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
       emptyResult.setPageFilteredRowCount(new int[0]);
       emptyResult.setPageFilteredRowId(new int[0][]);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 6faae03..3d62d9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -42,6 +42,11 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
    */
   private byte[][] noDictionaryKeys;
 
+  /**
+   * contains value of implicit columns in byte array format
+   */
+  private byte[] implicitColumnByteArray;
+
   public ByteArrayWrapper() {
   }
 
@@ -192,4 +197,18 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
     this.complexTypesKeys = complexTypesKeys;
   }
 
+  /**
+   * @return
+   */
+  public byte[] getImplicitColumnByteArray() {
+    return implicitColumnByteArray;
+  }
+
+  /**
+   * @param implicitColumnByteArray
+   */
+  public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
+    this.implicitColumnByteArray = implicitColumnByteArray;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
index c2cda7c..8a52294 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java
@@ -58,6 +58,28 @@ public interface QueryStatisticsConstants {
 
   String PAGE_SCANNED = "The number of page scanned";
 
+  /**
+   * measure filling time includes time taken for reading all measures data from a given offset
+   * and adding each column data to an array. Includes total time for 1 query result iterator.
+   */
+  String MEASURE_FILLING_TIME = "measure filling time";
+
+  /**
+   * key column filling time includes time taken for reading all dimensions data from a given offset
+   * and filling each column data to byte array. Includes total time for 1 query result iterator.
+   */
+  String KEY_COLUMN_FILLING_TIME = "key column filling time";
+
+  /**
+   * Time taken to uncompress a page data and decode dimensions and measures data in that page
+   */
+  String PAGE_UNCOMPRESS_TIME = "page uncompress time";
+
+  /**
+   * total of measure filling time, dimension filling time and page uncompressing time
+   */
+  String RESULT_PREP_TIME = "result preparation time";
+
   // clear no-use statistics timeout
   long CLEAR_STATISTICS_TIMEOUT = 60 * 1000 * 1000000L;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
index 25e5542..9197196 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
@@ -45,7 +45,11 @@ public class TaskStatistics implements Serializable {
       new Column("total_pages", QueryStatisticsConstants.TOTAL_PAGE_SCANNED),
       new Column("scanned_pages", QueryStatisticsConstants.PAGE_SCANNED),
       new Column("valid_pages", QueryStatisticsConstants.VALID_PAGE_SCANNED),
-      new Column("result_size", QueryStatisticsConstants.RESULT_SIZE)
+      new Column("result_size", QueryStatisticsConstants.RESULT_SIZE),
+      new Column("key_column_filling_time", QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME),
+      new Column("measure_filling_time", QueryStatisticsConstants.MEASURE_FILLING_TIME),
+      new Column("page_uncompress_time", QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME),
+      new Column("result_preparation_time", QueryStatisticsConstants.RESULT_PREP_TIME)
   };
 
   private static final int numOfColumns = columns.length;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index ee40aa9..d29284f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -80,6 +80,7 @@ class CarbonMergerRDD[K, V](
   val tableId = carbonMergerMapping.tableId
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val queryStartTime = System.currentTimeMillis()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -102,8 +103,6 @@ class CarbonMergerRDD[K, V](
       var processor: AbstractResultProcessor = null
       var rawResultIteratorList: java.util.List[RawResultIterator] = null
       try {
-
-
         // sorting the table block info List.
         val splitList = carbonSparkPartition.split.value.getAllSplits
         val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
@@ -234,7 +233,7 @@ class CarbonMergerRDD[K, V](
         // close all the query executor service and clean up memory acquired during query processing
         if (null != exec) {
           LOGGER.info("Cleaning up query resources acquired during compaction")
-          exec.close(rawResultIteratorList)
+          exec.close(rawResultIteratorList, queryStartTime)
         }
         // clean up the resources for processor
         if (null != processor) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index c668f7d..0d960d0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -465,6 +465,9 @@ class CarbonScanRDD[T: ClassTag](
         close()
         logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split)
       }
+      // create a statistics recorder
+      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
+      model.setStatisticsRecorder(recorder)
       // initialize the reader
       reader.initialize(inputSplit, attemptContext)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26607fb9/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 20103b1..a347313 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -42,7 +42,11 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
@@ -58,6 +62,8 @@ public class CarbonCompactionExecutor {
   private final SegmentProperties destinationSegProperties;
   private final Map<String, TaskBlockInfo> segmentMapping;
   private List<QueryExecutor> queryExecutorList;
+  private List<QueryStatisticsRecorder> queryStatisticsRecorders =
+      new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   private CarbonTable carbonTable;
   private QueryModel queryModel;
 
@@ -120,10 +126,13 @@ public class CarbonCompactionExecutor {
       for (String task : taskBlockListMapping) {
         list = taskBlockInfo.getTableBlockInfoList(task);
         Collections.sort(list);
-        LOGGER.info("for task -" + task + "-block size is -" + list.size());
+        LOGGER.info(
+            "for task -" + task + "- in segment id -" + segmentId + "- block size is -" + list
+                .size());
         queryModel.setTableBlockInfos(list);
-        resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
-            destinationSegProperties, false));
+        resultList.add(
+            new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties,
+                destinationSegProperties, false));
       }
     }
     return resultList;
@@ -164,9 +173,14 @@ public class CarbonCompactionExecutor {
    * @param blockList
    * @return
    */
-  private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
+  private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList,
+      String segmentId, String taskId)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
+    QueryStatisticsRecorder executorRecorder = CarbonTimeStatisticsFactory
+        .createExecutorRecorder(queryModel.getQueryId() + "_" + segmentId + "_" + taskId);
+    queryStatisticsRecorders.add(executorRecorder);
+    queryModel.setStatisticsRecorder(executorRecorder);
     QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
     queryExecutorList.add(queryExecutor);
     return queryExecutor.execute(queryModel);
@@ -176,7 +190,7 @@ public class CarbonCompactionExecutor {
    * Below method will be used
    * for cleanup
    */
-  public void close(List<RawResultIterator> rawResultIteratorList) {
+  public void close(List<RawResultIterator> rawResultIteratorList, long queryStartTime) {
     try {
       // close all the iterators. Iterators might not closed in case of compaction failure
       // or if process is killed
@@ -188,12 +202,26 @@ public class CarbonCompactionExecutor {
       for (QueryExecutor queryExecutor : queryExecutorList) {
         queryExecutor.finish();
       }
+      logStatistics(queryStartTime);
     } catch (QueryExecutionException e) {
       LOGGER.error(e, "Problem while close. Ignoring the exception");
     }
     clearDictionaryFromQueryModel();
   }
 
+  private void logStatistics(long queryStartTime) {
+    if (!queryStatisticsRecorders.isEmpty()) {
+      QueryStatistic queryStatistic = new QueryStatistic();
+      queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+          System.currentTimeMillis() - queryStartTime);
+      for (QueryStatisticsRecorder recorder : queryStatisticsRecorders) {
+        recorder.recordStatistics(queryStatistic);
+        // print executor query statistics for each task_id
+        recorder.logStatistics();
+      }
+    }
+  }
+
   /**
    * This method will clear the dictionary access count after its usage is complete so
    * that column can be deleted form LRU cache whenever memory reaches threshold
@@ -223,6 +251,7 @@ public class CarbonCompactionExecutor {
       enablePageReader = Boolean.parseBoolean(
           CarbonCommonConstants.CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION_DEFAULT);
     }
+    LOGGER.info("Page level reader is set to: " + enablePageReader);
     return enablePageReader;
   }