You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/24 05:38:43 UTC

[2/5] incubator-carbondata git commit: WIP Added code for new V3 format to optimize scan

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 9a8b254..ae9ba8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -53,11 +55,18 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangeValues;
     int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = isScanRequired(blockMinValue[columnIndex], filterValues);
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues) {
     boolean isScanRequired = false;
     for (int k = 0; k < filterValues.length; k++) {
       // and filter-min should be positive
-      int minCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
+      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue);
 
       // if any filter applied is not in range of min and max of block
       // then since its a less than fiter validate whether the block
@@ -67,26 +76,45 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
         break;
       }
     }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
-
+    return isScanRequired;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
     if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
       return super.applyFilter(blockChunkHolder);
     }
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk rawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+    for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+      if (rawColumnChunk.getMinValues() != null) {
+        if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
+          int compare = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
+          if (compare > 0) {
+            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
+            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          } else {
+            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+                rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        }
+      } else {
+        BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+            rawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
   }
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -156,7 +184,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       start = CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[i]);
       if (start < 0) {
         start = -(start + 1);
-        if (start == numerOfRows) {
+        if (start >= numerOfRows) {
           start = start - 1;
         }
         // Method will compare the tentative index value after binary search, this tentative
@@ -250,4 +278,16 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     }
     return bitSet;
   }
+
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      super.readBlocks(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index 8646301..af5568e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -91,12 +91,19 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
    *
    * @return start IndexKey
    */
-  public void getStartKey(long[] startKey,
-      SortedMap<Integer, byte[]> noDictStartKeys, List<long[]> startKeyList) {
-    FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
-        startKey, startKeyList);
-    FilterUtil
-        .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDictStartKeys);
+  public void getStartKey(long[] startKey, SortedMap<Integer, byte[]> noDictStartKeys,
+      List<long[]> startKeyList) {
+    switch (exp.getFilterExpressionType()) {
+      case GREATERTHAN:
+      case GREATERTHAN_EQUALTO:
+        FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
+            startKey, startKeyList);
+        FilterUtil
+            .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDictStartKeys);
+        break;
+      default:
+        //do nothing
+    }
   }
 
   /**
@@ -106,10 +113,17 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
    */
   @Override public void getEndKey(SegmentProperties segmentProperties, long[] endKeys,
       SortedMap<Integer, byte[]> noDicEndKeys, List<long[]> endKeyList) {
-    FilterUtil.getEndKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
-        endKeys, segmentProperties, endKeyList);
-    FilterUtil
-        .getEndKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDicEndKeys);
+    switch (exp.getFilterExpressionType()) {
+      case LESSTHAN:
+      case LESSTHAN_EQUALTO:
+        FilterUtil
+            .getEndKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(), endKeys,
+                segmentProperties, endKeyList);
+        FilterUtil.getEndKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDicEndKeys);
+        break;
+      default:
+        //do nothing
+    }
   }
 
   private List<byte[]> getNoDictionaryRangeValues() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index 6b0a458..f0cebf4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -18,6 +18,10 @@ package org.apache.carbondata.core.scan.processor;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -29,7 +33,6 @@ import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultColle
 import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector;
 import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.scanner.BlockletScanner;
@@ -63,23 +66,33 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
   protected BlockletScanner blockletScanner;
 
   /**
-   * to hold the data block
-   */
-  protected BlocksChunkHolder blocksChunkHolder;
-
-  /**
    * batch size of result
    */
   protected int batchSize;
 
+  protected ExecutorService executorService;
+
+  private Future<AbstractScannedResult> future;
+
+  private Future<BlocksChunkHolder> futureIo;
+
   protected AbstractScannedResult scannedResult;
 
-  public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
-      int batchSize, QueryStatisticsModel queryStatisticsModel,
-      BlocksChunkHolder blockChunkHolder) {
+  private BlockExecutionInfo blockExecutionInfo;
+
+  private FileHolder fileReader;
+
+  private AtomicBoolean nextBlock;
+
+  private AtomicBoolean nextRead;
+
+  public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo,
+      FileHolder fileReader, int batchSize, QueryStatisticsModel queryStatisticsModel,
+      ExecutorService executorService) {
+    this.blockExecutionInfo = blockExecutionInfo;
+    this.fileReader = fileReader;
     dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
         blockExecutionInfo.getNumberOfBlockToScan());
-    blocksChunkHolder = blockChunkHolder;
     if (blockExecutionInfo.getFilterExecuterTree() != null) {
       blockletScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel);
     } else {
@@ -99,13 +112,16 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
           new DictionaryBasedResultCollector(blockExecutionInfo);
     }
     this.batchSize = batchSize;
+    this.executorService = executorService;
+    this.nextBlock = new AtomicBoolean(false);
+    this.nextRead = new AtomicBoolean(false);
   }
 
   public boolean hasNext() {
     if (scannedResult != null && scannedResult.hasNext()) {
       return true;
     } else {
-      return dataBlockIterator.hasNext();
+      return dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get();
     }
   }
 
@@ -121,22 +137,85 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
           }
           scannedResult = getNextScannedResult();
         }
+        nextBlock.set(false);
+        nextRead.set(false);
         return false;
       }
-    } catch (IOException | FilterUnsupportedException ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
 
-  private AbstractScannedResult getNextScannedResult()
-      throws IOException, FilterUnsupportedException {
-    if (dataBlockIterator.hasNext()) {
-      blocksChunkHolder.setDataBlock(dataBlockIterator.next());
-      blocksChunkHolder.reset();
-      return blockletScanner.scanBlocklet(blocksChunkHolder);
+  private AbstractScannedResult getNextScannedResult() throws Exception {
+    AbstractScannedResult result = null;
+    if (dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get()) {
+      if (future == null) {
+        future = execute();
+      }
+      result = future.get();
+      nextBlock.set(false);
+      if (dataBlockIterator.hasNext() || nextRead.get()) {
+        nextBlock.set(true);
+        future = execute();
+      }
+    }
+    return result;
+  }
+
+  private BlocksChunkHolder getBlocksChunkHolder() throws IOException {
+    BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolderInternal();
+    while (blocksChunkHolder == null && dataBlockIterator.hasNext()) {
+      blocksChunkHolder = getBlocksChunkHolderInternal();
+    }
+    return blocksChunkHolder;
+  }
+
+  private BlocksChunkHolder getBlocksChunkHolderInternal() throws IOException {
+    BlocksChunkHolder blocksChunkHolder =
+        new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
+            blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
+    blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+    if (blockletScanner.isScanRequired(blocksChunkHolder)) {
+      return blocksChunkHolder;
     }
     return null;
   }
 
+  private Future<AbstractScannedResult> execute() {
+    return executorService.submit(new Callable<AbstractScannedResult>() {
+      @Override public AbstractScannedResult call() throws Exception {
+        if (futureIo == null) {
+          futureIo = executeRead();
+        }
+        BlocksChunkHolder blocksChunkHolder = futureIo.get();
+        futureIo = null;
+        nextRead.set(false);
+        if (blocksChunkHolder != null) {
+          if (dataBlockIterator.hasNext()) {
+            nextRead.set(true);
+            futureIo = executeRead();
+          }
+          return blockletScanner.scanBlocklet(blocksChunkHolder);
+        }
+        return null;
+      }
+    });
+  }
+
+  private Future<BlocksChunkHolder> executeRead() {
+    return executorService.submit(new Callable<BlocksChunkHolder>() {
+      @Override public BlocksChunkHolder call() throws Exception {
+        if (dataBlockIterator.hasNext()) {
+          BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder();
+          if (blocksChunkHolder != null) {
+            blockletScanner.readBlocklet(blocksChunkHolder);
+            return blocksChunkHolder;
+          }
+        }
+        return null;
+      }
+    });
+  }
+
   public abstract void processNextBatch(CarbonColumnarBatch columnarBatch);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
index 2b1a48e..5227115 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
@@ -18,8 +18,8 @@ package org.apache.carbondata.core.scan.processor;
 
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 
 /**
  * Block chunk holder which will hold the dimension and
@@ -30,12 +30,12 @@ public class BlocksChunkHolder {
   /**
    * dimension column data chunk
    */
-  private DimensionColumnDataChunk[] dimensionDataChunk;
+  private DimensionRawColumnChunk[] dimensionRawDataChunk;
 
   /**
    * measure column data chunk
    */
-  private MeasureColumnDataChunk[] measureDataChunk;
+  private MeasureRawColumnChunk[] measureRawDataChunk;
 
   /**
    * file reader which will use to read the block from file
@@ -48,36 +48,43 @@ public class BlocksChunkHolder {
   private DataRefNode dataBlock;
 
   public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock) {
-    dimensionDataChunk = new DimensionColumnDataChunk[numberOfDimensionBlock];
-    measureDataChunk = new MeasureColumnDataChunk[numberOfMeasureBlock];
+    dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
+    measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
+  }
+
+  public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock,
+      FileHolder fileReader) {
+    dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
+    measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
+    this.fileReader = fileReader;
   }
 
   /**
-   * @return the dimensionDataChunk
+   * @return the dimensionRawDataChunk
    */
-  public DimensionColumnDataChunk[] getDimensionDataChunk() {
-    return dimensionDataChunk;
+  public DimensionRawColumnChunk[] getDimensionRawDataChunk() {
+    return dimensionRawDataChunk;
   }
 
   /**
-   * @param dimensionDataChunk the dimensionDataChunk to set
+   * @param dimensionRawDataChunk the dimensionRawDataChunk to set
    */
-  public void setDimensionDataChunk(DimensionColumnDataChunk[] dimensionDataChunk) {
-    this.dimensionDataChunk = dimensionDataChunk;
+  public void setDimensionRawDataChunk(DimensionRawColumnChunk[] dimensionRawDataChunk) {
+    this.dimensionRawDataChunk = dimensionRawDataChunk;
   }
 
   /**
-   * @return the measureDataChunk
+   * @return the measureRawDataChunk
    */
-  public MeasureColumnDataChunk[] getMeasureDataChunk() {
-    return measureDataChunk;
+  public MeasureRawColumnChunk[] getMeasureRawDataChunk() {
+    return measureRawDataChunk;
   }
 
   /**
-   * @param measureDataChunk the measureDataChunk to set
+   * @param measureRawDataChunk the measureRawDataChunk to set
    */
-  public void setMeasureDataChunk(MeasureColumnDataChunk[] measureDataChunk) {
-    this.measureDataChunk = measureDataChunk;
+  public void setMeasureRawDataChunk(MeasureRawColumnChunk[] measureRawDataChunk) {
+    this.measureRawDataChunk = measureRawDataChunk;
   }
 
   /**
@@ -113,11 +120,11 @@ public class BlocksChunkHolder {
    * array
    */
   public void reset() {
-    for (int i = 0; i < measureDataChunk.length; i++) {
-      this.measureDataChunk[i] = null;
+    for (int i = 0; i < measureRawDataChunk.length; i++) {
+      this.measureRawDataChunk[i] = null;
     }
-    for (int i = 0; i < dimensionDataChunk.length; i++) {
-      this.dimensionDataChunk[i] = null;
+    for (int i = 0; i < dimensionRawDataChunk.length; i++) {
+      this.dimensionRawDataChunk[i] = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
index f78b75b..5eab7b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.processor.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -37,8 +38,8 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
    */
   public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
       int batchSize, QueryStatisticsModel queryStatisticsModel,
-      BlocksChunkHolder blockChunkHolder) {
-    super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, blockChunkHolder);
+      BlocksChunkHolder blockChunkHolder, ExecutorService executorService) {
+    super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, executorService);
   }
 
   /**
@@ -64,9 +65,6 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
   public void processNextBatch(CarbonColumnarBatch columnarBatch) {
     if (updateScanner()) {
       this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
-      while (columnarBatch.getActualSize() < columnarBatch.getBatchSize() && updateScanner()) {
-        this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 60e6f67..fbd7044 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -47,16 +48,23 @@ public abstract class AbstractScannedResult {
    * current row number
    */
   protected int currentRow = -1;
+
+  protected int pageCounter;
   /**
    * row mapping indexes
    */
-  protected int[] rowMapping;
+  protected int[][] rowMapping;
   /**
    * key size of the fixed length column
    */
   private int fixedLengthKeySize;
   /**
-   * total number of rows
+   * total number of rows per page
+   */
+  private int[] numberOfRows;
+
+  /**
+   * Total number of rows.
    */
   private int totalNumberOfRows;
   /**
@@ -66,11 +74,16 @@ public abstract class AbstractScannedResult {
   /**
    * dimension column data chunk
    */
-  protected DimensionColumnDataChunk[] dataChunks;
+  protected DimensionColumnDataChunk[][] dataChunks;
+
+  /**
+   * Raw dimension chunks;
+   */
+  protected DimensionRawColumnChunk[] rawColumnChunks;
   /**
    * measure column data chunk
    */
-  protected MeasureColumnDataChunk[] measureDataChunks;
+  protected MeasureColumnDataChunk[][] measureDataChunks;
   /**
    * dictionary column block index in file
    */
@@ -128,7 +141,7 @@ public abstract class AbstractScannedResult {
    *
    * @param dataChunks dimension chunks used in query
    */
-  public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) {
+  public void setDimensionChunks(DimensionColumnDataChunk[][] dataChunks) {
     this.dataChunks = dataChunks;
   }
 
@@ -137,10 +150,14 @@ public abstract class AbstractScannedResult {
    *
    * @param measureDataChunks measure data chunks
    */
-  public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) {
+  public void setMeasureChunks(MeasureColumnDataChunk[][] measureDataChunks) {
     this.measureDataChunks = measureDataChunks;
   }
 
+  public void setRawColumnChunks(DimensionRawColumnChunk[] rawColumnChunks) {
+    this.rawColumnChunks = rawColumnChunks;
+  }
+
   /**
    * Below method will be used to get the chunk based in measure ordinal
    *
@@ -148,7 +165,7 @@ public abstract class AbstractScannedResult {
    * @return measure column chunk
    */
   public MeasureColumnDataChunk getMeasureChunk(int ordinal) {
-    return measureDataChunks[ordinal];
+    return measureDataChunks[ordinal][pageCounter];
   }
 
   /**
@@ -162,7 +179,7 @@ public abstract class AbstractScannedResult {
     byte[] completeKey = new byte[fixedLengthKeySize];
     int offset = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      offset += dataChunks[dictionaryColumnBlockIndexes[i]]
+      offset += dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
           .fillChunkData(completeKey, offset, rowId,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
@@ -181,7 +198,7 @@ public abstract class AbstractScannedResult {
     int[] completeKey = new int[totalDimensionsSize];
     int column = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[dictionaryColumnBlockIndexes[i]]
+      column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
           .fillConvertedChunkData(rowId, column, completeKey,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
@@ -195,7 +212,7 @@ public abstract class AbstractScannedResult {
   public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[dictionaryColumnBlockIndexes[i]]
+      column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
           .fillConvertedChunkData(vectorInfo, column,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
@@ -207,7 +224,7 @@ public abstract class AbstractScannedResult {
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[noDictionaryColumnBlockIndexes[i]]
+      column = dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
           .fillConvertedChunkData(vectorInfo, column,
               columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
     }
@@ -219,7 +236,7 @@ public abstract class AbstractScannedResult {
   public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
     for (int i = 0; i < measuresOrdinal.length; i++) {
       vectorInfo[i].measureVectorFiller
-          .fillMeasureVector(measureDataChunks[measuresOrdinal[i]], vectorInfo[i]);
+          .fillMeasureVector(measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
     }
   }
 
@@ -233,8 +250,9 @@ public abstract class AbstractScannedResult {
         ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
         DataOutputStream dataOutput = new DataOutputStream(byteStream);
         try {
-          vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks,
-              rowMapping == null ? j : rowMapping[j], dataOutput);
+          vectorInfos[i].genericQueryType
+              .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks,
+                  rowMapping == null ? j : rowMapping[pageCounter][j], pageCounter, dataOutput);
           Object data = vectorInfos[i].genericQueryType
               .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
           vector.putObject(vectorOffset++, data);
@@ -257,6 +275,31 @@ public abstract class AbstractScannedResult {
   }
 
   /**
+   * Just increment the page counter and reset the remaining counters.
+   */
+  public void incrementPageCounter() {
+    rowCounter = 0;
+    currentRow = -1;
+    pageCounter++;
+  }
+
+  public int numberOfpages() {
+    return numberOfRows.length;
+  }
+
+  /**
+   * Get total rows in the current page
+   * @return
+   */
+  public int getCurrentPageRowCount() {
+    return numberOfRows[pageCounter];
+  }
+
+  public int getCurrentPageCounter() {
+    return pageCounter;
+  }
+
+  /**
    * increment the counter.
    */
   public void setRowCounter(int rowCounter) {
@@ -272,7 +315,7 @@ public abstract class AbstractScannedResult {
    * @return dimension data based on row id
    */
   protected byte[] getDimensionData(int dimOrdinal, int rowId) {
-    return dataChunks[dimOrdinal].getChunkData(rowId);
+    return dataChunks[dimOrdinal][pageCounter].getChunkData(rowId);
   }
 
   /**
@@ -287,7 +330,7 @@ public abstract class AbstractScannedResult {
     int position = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
       noDictionaryColumnsKeys[position++] =
-          dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId);
+          dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId);
     }
     return noDictionaryColumnsKeys;
   }
@@ -303,8 +346,8 @@ public abstract class AbstractScannedResult {
     String[] noDictionaryColumnsKeys = new String[noDictionaryColumnBlockIndexes.length];
     int position = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      noDictionaryColumnsKeys[position++] =
-          new String(dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId));
+      noDictionaryColumnsKeys[position++] = new String(
+          dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId));
     }
     return noDictionaryColumnsKeys;
   }
@@ -353,7 +396,9 @@ public abstract class AbstractScannedResult {
       ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
       DataOutputStream dataOutput = new DataOutputStream(byteStream);
       try {
-        genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks, rowId, dataOutput);
+        genericQueryType
+            .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, rowId, pageCounter,
+                dataOutput);
         complexTypeData[i] = byteStream.toByteArray();
       } catch (IOException e) {
         LOGGER.error(e);
@@ -378,8 +423,13 @@ public abstract class AbstractScannedResult {
    * @return
    */
   public boolean hasNext() {
-    if (rowCounter < this.totalNumberOfRows) {
+    if (pageCounter < numberOfRows.length && rowCounter < this.numberOfRows[pageCounter]) {
       return true;
+    } else if (pageCounter < numberOfRows.length) {
+      pageCounter++;
+      rowCounter = 0;
+      currentRow = -1;
+      return hasNext();
     }
     return false;
   }
@@ -393,13 +443,18 @@ public abstract class AbstractScannedResult {
   public void reset() {
     rowCounter = 0;
     currentRow = -1;
+    pageCounter = 0;
   }
 
   /**
-   * @param totalNumberOfRows set total of number rows valid after scanning
+   * @param numberOfRows set total of number rows valid after scanning
    */
-  public void setNumberOfRows(int totalNumberOfRows) {
-    this.totalNumberOfRows = totalNumberOfRows;
+  public void setNumberOfRows(int[] numberOfRows) {
+    this.numberOfRows = numberOfRows;
+
+    for (int count: numberOfRows) {
+      totalNumberOfRows += count;
+    }
   }
 
   /**
@@ -408,7 +463,7 @@ public abstract class AbstractScannedResult {
    *
    * @param indexes
    */
-  public void setIndexes(int[] indexes) {
+  public void setIndexes(int[][] indexes) {
     this.rowMapping = indexes;
   }
 
@@ -420,7 +475,8 @@ public abstract class AbstractScannedResult {
    * @return whether it is null or not
    */
   protected boolean isNullMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex);
+    return measureDataChunks[ordinal][pageCounter].getNullValueIndexHolder().getBitSet()
+        .get(rowIndex);
   }
 
   /**
@@ -432,7 +488,8 @@ public abstract class AbstractScannedResult {
    * @return measure value of long type
    */
   protected long getLongMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex);
+    return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
+        .getReadableLongValueByIndex(rowIndex);
   }
 
   /**
@@ -443,7 +500,7 @@ public abstract class AbstractScannedResult {
    * @return measure value of double type
    */
   protected double getDoubleMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getMeasureDataHolder()
+    return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
         .getReadableDoubleValueByIndex(rowIndex);
   }
 
@@ -455,7 +512,7 @@ public abstract class AbstractScannedResult {
    * @return measure of big decimal type
    */
   protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getMeasureDataHolder()
+    return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
         .getReadableBigDecimalValueByIndex(rowIndex);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 6ca0570..d1f8b7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -37,7 +37,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    */
   @Override public byte[] getDictionaryKeyArray() {
     ++currentRow;
-    return getDictionaryKeyArray(rowMapping[currentRow]);
+    return getDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -46,7 +46,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    */
   @Override public int[] getDictionaryKeyIntegerArray() {
     ++currentRow;
-    return getDictionaryKeyIntegerArray(rowMapping[currentRow]);
+    return getDictionaryKeyIntegerArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -55,7 +55,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return complex type key array
    */
   @Override public byte[][] getComplexTypeKeyArray() {
-    return getComplexTypeKeyArray(rowMapping[currentRow]);
+    return getComplexTypeKeyArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -65,7 +65,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return no dictionary key array for all the no dictionary dimension
    */
   @Override public byte[][] getNoDictionaryKeyArray() {
-    return getNoDictionaryKeyArray(rowMapping[currentRow]);
+    return getNoDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -75,7 +75,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return no dictionary key array for all the no dictionary dimension
    */
   @Override public String[] getNoDictionaryKeyStringArray() {
-    return getNoDictionaryKeyStringArray(rowMapping[currentRow]);
+    return getNoDictionaryKeyStringArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -84,7 +84,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return valid row id
    */
   @Override public int getCurrenrRowId() {
-    return rowMapping[currentRow];
+    return rowMapping[pageCounter][currentRow];
   }
 
   /**
@@ -93,8 +93,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
   public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[dictionaryColumnBlockIndexes[i]]
-          .fillConvertedChunkData(rowMapping, vectorInfo, column,
+      column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
+          .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
   }
@@ -105,8 +105,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[noDictionaryColumnBlockIndexes[i]]
-          .fillConvertedChunkData(rowMapping, vectorInfo, column,
+      column = dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
+          .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
               columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
     }
   }
@@ -116,9 +116,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    */
   public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
     for (int i = 0; i < measuresOrdinal.length; i++) {
-      vectorInfo[i].measureVectorFiller
-          .fillMeasureVectorForFilter(rowMapping, measureDataChunks[measuresOrdinal[i]],
-              vectorInfo[i]);
+      vectorInfo[i].measureVectorFiller.fillMeasureVectorForFilter(rowMapping[pageCounter],
+          measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index b4bcba6..1176e5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.scan.result.iterator;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -64,8 +65,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
    * file reader which will be used to execute the query
    */
   protected FileHolder fileReader;
+
   protected AbstractDataBlockIterator dataBlockIterator;
-  protected boolean nextBatch = false;
   /**
    * total time scan the blocks
    */
@@ -138,7 +139,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   }
 
   @Override public boolean hasNext() {
-    if ((dataBlockIterator != null && dataBlockIterator.hasNext()) || nextBatch) {
+    if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
       return true;
     } else if (blockExecutionInfos.size() > 0) {
       return true;
@@ -168,10 +169,10 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
       BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
       blockExecutionInfos.remove(executionInfo);
       queryStatisticsModel.setRecorder(recorder);
-      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-          blocksChunkHolder.getMeasureDataChunk());
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+          blocksChunkHolder.getMeasureRawDataChunk());
       return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel,
-          blocksChunkHolder);
+          blocksChunkHolder, execService);
     }
     return null;
   }
@@ -191,8 +192,14 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   }
 
   @Override public void close() {
-    CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-        blocksChunkHolder.getMeasureDataChunk());
+    try {
+      fileReader.finish();
+    } catch (IOException e) {
+      LOGGER.error(e);
+    } finally {
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+          blocksChunkHolder.getMeasureRawDataChunk());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index ed4b286..1fa2f4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -17,9 +17,7 @@
 package org.apache.carbondata.core.scan.result.iterator;
 
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -33,7 +31,6 @@ import org.apache.carbondata.core.scan.result.BatchResult;
 public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> {
 
   private final Object lock = new Object();
-  private Future<BatchResult> future;
 
   public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
       ExecutorService execService) {
@@ -41,43 +38,20 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
   }
 
   @Override public BatchResult next() {
-    BatchResult result;
     long startTime = System.currentTimeMillis();
-    try {
-      if (future == null) {
-        future = execute();
-      }
-      result = future.get();
-      nextBatch = false;
-      if (hasNext()) {
-        nextBatch = true;
-        future = execute();
-      } else {
-        fileReader.finish();
-      }
-      totalScanTime += System.currentTimeMillis() - startTime;
-    } catch (Exception ex) {
-      try {
-        fileReader.finish();
-      } finally {
-        throw new RuntimeException(ex);
-      }
-    }
-    return result;
+    BatchResult batchResult = getBatchResult();
+    totalScanTime += System.currentTimeMillis() - startTime;
+    return batchResult;
   }
 
-  private Future<BatchResult> execute() {
-    return execService.submit(new Callable<BatchResult>() {
-      @Override public BatchResult call() {
-        BatchResult batchResult = new BatchResult();
-        synchronized (lock) {
-          updateDataBlockIterator();
-          if (dataBlockIterator != null) {
-            batchResult.setRows(dataBlockIterator.next());
-          }
-        }
-        return batchResult;
+  private BatchResult getBatchResult() {
+    BatchResult batchResult = new BatchResult();
+    synchronized (lock) {
+      updateDataBlockIterator();
+      if (dataBlockIterator != null) {
+        batchResult.setRows(dataBlockIterator.next());
       }
-    });
+    }
+    return batchResult;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 258a476..341fb21 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -19,12 +19,17 @@ package org.apache.carbondata.core.scan.scanner;
 import java.io.IOException;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
 import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
@@ -35,51 +40,71 @@ import org.apache.carbondata.core.stats.QueryStatisticsModel;
 public abstract class AbstractBlockletScanner implements BlockletScanner {
 
   /**
-   * scanner result
-   */
-  protected AbstractScannedResult scannedResult;
-
-  /**
    * block execution info
    */
   protected BlockExecutionInfo blockExecutionInfo;
 
   public QueryStatisticsModel queryStatisticsModel;
 
+  private AbstractScannedResult emptyResult;
+
   public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
     this.blockExecutionInfo = tableBlockExecutionInfos;
   }
 
   @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
       throws IOException, FilterUnsupportedException {
-    fillKeyValue(blocksChunkHolder);
-    return scannedResult;
-  }
-
-  protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) throws IOException {
-
+    AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
     QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
-            .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+        .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
-            totalBlockletStatistic.getCount() + 1);
+        totalBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic);
-    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel
-            .getStatisticsTypeAndObjMap().get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
     validScannedBlockletStatistic
-            .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
-                    validScannedBlockletStatistic.getCount() + 1);
+        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+            validScannedBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
-    scannedResult.reset();
-    scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
     scannedResult.setBlockletId(
-              blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR
-                      + blocksChunkHolder.getDataBlock().nodeNumber());
-    scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
-        .getDimensionChunks(blocksChunkHolder.getFileReader(),
-            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
-    scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
-            .getMeasureChunks(blocksChunkHolder.getFileReader(),
-                blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+            .getDataBlock().nodeNumber());
+    DimensionRawColumnChunk[] dimensionRawColumnChunks =
+        blocksChunkHolder.getDimensionRawDataChunk();
+    DimensionColumnDataChunk[][] dimensionColumnDataChunks =
+        new DimensionColumnDataChunk[dimensionRawColumnChunks.length][];
+    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+      if (dimensionRawColumnChunks[i] != null) {
+        dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks();
+      }
+    }
+    scannedResult.setDimensionChunks(dimensionColumnDataChunks);
+    MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getMeasureRawDataChunk();
+    MeasureColumnDataChunk[][] measureColumnDataChunks =
+        new MeasureColumnDataChunk[measureRawColumnChunks.length][];
+    for (int i = 0; i < measureRawColumnChunks.length; i++) {
+      if (measureRawColumnChunks[i] != null) {
+        measureColumnDataChunks[i] = measureRawColumnChunks[i].convertToMeasureColDataChunks();
+      }
+    }
+    scannedResult.setMeasureChunks(measureColumnDataChunks);
+    int[] numberOfRows = new int[] { blocksChunkHolder.getDataBlock().nodeSize() };
+    if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
+      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+        if (dimensionRawColumnChunks[i] != null) {
+          numberOfRows = dimensionRawColumnChunks[i].getRowCount();
+          break;
+        }
+      }
+    } else if (blockExecutionInfo.getAllSelectedMeasureBlocksIndexes().length > 0) {
+      for (int i = 0; i < measureRawColumnChunks.length; i++) {
+        if (measureRawColumnChunks[i] != null) {
+          numberOfRows = measureRawColumnChunks[i].getRowCount();
+          break;
+        }
+      }
+    }
+    scannedResult.setNumberOfRows(numberOfRows);
     // loading delete data cache in blockexecutioninfo instance
     DeleteDeltaCacheLoaderIntf deleteCacheLoader =
         new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
@@ -87,5 +112,32 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
     deleteCacheLoader.loadDeleteDeltaFileDataToCache();
     scannedResult
         .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+    scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
+    return scannedResult;
+  }
+
+  @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    DimensionRawColumnChunk[] dimensionRawColumnChunks = blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(blocksChunkHolder.getFileReader(),
+            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes());
+    blocksChunkHolder.setDimensionRawDataChunk(dimensionRawColumnChunks);
+    MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getDataBlock()
+        .getMeasureChunks(blocksChunkHolder.getFileReader(),
+            blockExecutionInfo.getAllSelectedMeasureBlocksIndexes());
+    blocksChunkHolder.setMeasureRawDataChunk(measureRawColumnChunks);
+  }
+
+  @Override public AbstractScannedResult createEmptyResult() {
+    if (emptyResult == null) {
+      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+      emptyResult.setNumberOfRows(new int[0]);
+      emptyResult.setIndexes(new int[0][]);
+    }
+    return emptyResult;
+  }
+
+  @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    // For non filter it is always true
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
index 6b8a94a..0ed0d43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -29,6 +29,14 @@ import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 public interface BlockletScanner {
 
   /**
+   * Checks whether this blocklet required to scan or not based on min max of each blocklet.
+   * @param blocksChunkHolder
+   * @return
+   * @throws IOException
+   */
+  boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException;
+
+  /**
    * Below method will used to process the block data and get the scanned result
    *
    * @param blocksChunkHolder block chunk which holds the block data
@@ -37,4 +45,16 @@ public interface BlockletScanner {
    */
   AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
       throws IOException, FilterUnsupportedException;
+
+  /**
+   * Just reads the blocklet from file, does not uncompress it.
+   * @param blocksChunkHolder
+   */
+  void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException;
+
+  /**
+   * In case if there is no filter satisfies.
+   * @return AbstractScannedResult
+   */
+  AbstractScannedResult createEmptyResult();
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index 41e4fa5..c3d86aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -24,6 +24,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
 import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -36,6 +38,7 @@ import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -66,7 +69,6 @@ public class FilterScanner extends AbstractBlockletScanner {
   public FilterScanner(BlockExecutionInfo blockExecutionInfo,
       QueryStatisticsModel queryStatisticsModel) {
     super(blockExecutionInfo);
-    scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
     // to check whether min max is enabled or not
     String minMaxEnableValue = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
@@ -87,8 +89,26 @@ public class FilterScanner extends AbstractBlockletScanner {
    */
   @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
       throws IOException, FilterUnsupportedException {
-    fillScannedResult(blocksChunkHolder);
-    return scannedResult;
+    return fillScannedResult(blocksChunkHolder);
+  }
+
+  @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    // apply min max
+    if (isMinMaxEnabled) {
+      BitSet bitSet = this.filterExecuter
+          .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
+              blocksChunkHolder.getDataBlock().getColumnsMinValue());
+      if (bitSet.isEmpty()) {
+        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+            blocksChunkHolder.getMeasureRawDataChunk());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    this.filterExecuter.readBlocks(blocksChunkHolder);
   }
 
   /**
@@ -107,35 +127,21 @@ public class FilterScanner extends AbstractBlockletScanner {
    * @param blocksChunkHolder
    * @throws FilterUnsupportedException
    */
-  private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
+  private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHolder)
       throws FilterUnsupportedException, IOException {
-    scannedResult.reset();
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
-            .getDataBlock().nodeNumber());
-    // apply min max
-    if (isMinMaxEnabled) {
-      BitSet bitSet = this.filterExecuter
-          .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
-              blocksChunkHolder.getDataBlock().getColumnsMinValue());
-      if (bitSet.isEmpty()) {
-        scannedResult.setNumberOfRows(0);
-        scannedResult.setIndexes(new int[0]);
-        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-            blocksChunkHolder.getMeasureDataChunk());
-        return;
-      }
-    }
     // apply filter on actual data
-    BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder);
+    BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder);
     // if indexes is empty then return with empty result
-    if (bitSet.isEmpty()) {
-      scannedResult.setNumberOfRows(0);
-      scannedResult.setIndexes(new int[0]);
-      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-          blocksChunkHolder.getMeasureDataChunk());
-      return;
+    if (bitSetGroup.isEmpty()) {
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+          blocksChunkHolder.getMeasureRawDataChunk());
+      return createEmptyResult();
     }
+
+    AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+    scannedResult.setBlockletId(
+        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+            .getDataBlock().nodeNumber());
     // valid scanned blocklet
     QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
@@ -143,11 +149,20 @@ public class FilterScanner extends AbstractBlockletScanner {
         .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
             validScannedBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+    int[] rowCount = new int[bitSetGroup.getNumberOfPages()];
     // get the row indexes from bot set
-    int[] indexes = new int[bitSet.cardinality()];
-    int index = 0;
-    for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
-      indexes[index++] = i;
+    int[][] indexesGroup = new int[bitSetGroup.getNumberOfPages()][];
+    for (int k = 0; k < indexesGroup.length; k++) {
+      BitSet bitSet = bitSetGroup.getBitSet(k);
+      if (bitSet != null && !bitSet.isEmpty()) {
+        int[] indexes = new int[bitSet.cardinality()];
+        int index = 0;
+        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+          indexes[index++] = i;
+        }
+        rowCount[k] = indexes.length;
+        indexesGroup[k] = indexes;
+      }
     }
     // loading delete data cache in blockexecutioninfo instance
     DeleteDeltaCacheLoaderIntf deleteCacheLoader =
@@ -159,42 +174,91 @@ public class FilterScanner extends AbstractBlockletScanner {
     FileHolder fileReader = blocksChunkHolder.getFileReader();
     int[][] allSelectedDimensionBlocksIndexes =
         blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
-    DimensionColumnDataChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
+    DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
         .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
 
-    DimensionColumnDataChunk[] dimensionColumnDataChunk =
-        new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
+    DimensionRawColumnChunk[] dimensionRawColumnChunks =
+        new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
     // read dimension chunk blocks from file which is not present
-    for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
-      if (null != blocksChunkHolder.getDimensionDataChunk()[i]) {
-        dimensionColumnDataChunk[i] = blocksChunkHolder.getDimensionDataChunk()[i];
+    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+      if (null != blocksChunkHolder.getDimensionRawDataChunk()[i]) {
+        dimensionRawColumnChunks[i] = blocksChunkHolder.getDimensionRawDataChunk()[i];
       }
     }
     for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
-      System.arraycopy(projectionListDimensionChunk, allSelectedDimensionBlocksIndexes[i][0],
-          dimensionColumnDataChunk, allSelectedDimensionBlocksIndexes[i][0],
-          allSelectedDimensionBlocksIndexes[i][1] + 1 - allSelectedDimensionBlocksIndexes[i][0]);
+      for (int j = allSelectedDimensionBlocksIndexes[i][0];
+           j <= allSelectedDimensionBlocksIndexes[i][1]; j++) {
+        dimensionRawColumnChunks[j] = projectionListDimensionChunk[j];
+      }
     }
-    MeasureColumnDataChunk[] measureColumnDataChunk =
-        new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
+    /**
+     * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
+     * then loading them
+     */
+    int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
+    int projectionListDimensionIndexesLength = projectionListDimensionIndexes.length;
+    for (int i = 0; i < projectionListDimensionIndexesLength; i++) {
+      if (null == dimensionRawColumnChunks[projectionListDimensionIndexes[i]]) {
+        dimensionRawColumnChunks[projectionListDimensionIndexes[i]] =
+            blocksChunkHolder.getDataBlock()
+                .getDimensionChunk(fileReader, projectionListDimensionIndexes[i]);
+      }
+    }
+    MeasureRawColumnChunk[] measureRawColumnChunks =
+        new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
     int[][] allSelectedMeasureBlocksIndexes =
         blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
-    MeasureColumnDataChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
+    MeasureRawColumnChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
         .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
     // read the measure chunk blocks which is not present
-    for (int i = 0; i < measureColumnDataChunk.length; i++) {
-      if (null != blocksChunkHolder.getMeasureDataChunk()[i]) {
-        measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i];
+    for (int i = 0; i < measureRawColumnChunks.length; i++) {
+      if (null != blocksChunkHolder.getMeasureRawDataChunk()[i]) {
+        measureRawColumnChunks[i] = blocksChunkHolder.getMeasureRawDataChunk()[i];
       }
     }
     for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
-      System.arraycopy(projectionListMeasureChunk, allSelectedMeasureBlocksIndexes[i][0],
-          measureColumnDataChunk, allSelectedMeasureBlocksIndexes[i][0],
-          allSelectedMeasureBlocksIndexes[i][1] + 1 - allSelectedMeasureBlocksIndexes[i][0]);
+      for (int j = allSelectedMeasureBlocksIndexes[i][0];
+           j <= allSelectedMeasureBlocksIndexes[i][1]; j++) {
+        measureRawColumnChunks[j] = projectionListMeasureChunk[j];
+      }
+    }
+    /**
+     * in case projection if the projected measure are not loaded in the measureColumnDataChunk
+     * then loading them
+     */
+    int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
+    int projectionListMeasureIndexesLength = projectionListMeasureIndexes.length;
+    for (int i = 0; i < projectionListMeasureIndexesLength; i++) {
+      if (null == measureRawColumnChunks[projectionListMeasureIndexes[i]]) {
+        measureRawColumnChunks[projectionListMeasureIndexes[i]] = blocksChunkHolder.getDataBlock()
+            .getMeasureChunk(fileReader, projectionListMeasureIndexes[i]);
+      }
     }
-    scannedResult.setDimensionChunks(dimensionColumnDataChunk);
-    scannedResult.setIndexes(indexes);
-    scannedResult.setMeasureChunks(measureColumnDataChunk);
-    scannedResult.setNumberOfRows(indexes.length);
+    DimensionColumnDataChunk[][] dimensionColumnDataChunks =
+        new DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length];
+    MeasureColumnDataChunk[][] measureColumnDataChunks =
+        new MeasureColumnDataChunk[measureRawColumnChunks.length][indexesGroup.length];
+    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+      for (int j = 0; j < indexesGroup.length; j++) {
+        if (dimensionRawColumnChunks[i] != null) {
+          dimensionColumnDataChunks[i][j] =
+              dimensionRawColumnChunks[i].convertToDimColDataChunk(j);
+        }
+      }
+    }
+    for (int i = 0; i < measureRawColumnChunks.length; i++) {
+      for (int j = 0; j < indexesGroup.length; j++) {
+        if (measureRawColumnChunks[i] != null) {
+          measureColumnDataChunks[i][j] =
+              measureRawColumnChunks[i].convertToMeasureColDataChunk(j);
+        }
+      }
+    }
+    scannedResult.setDimensionChunks(dimensionColumnDataChunks);
+    scannedResult.setIndexes(indexesGroup);
+    scannedResult.setMeasureChunks(measureColumnDataChunks);
+    scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
+    scannedResult.setNumberOfRows(rowCount);
+    return scannedResult;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
index cda39f2..1373ed5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
@@ -17,7 +17,6 @@
 package org.apache.carbondata.core.scan.scanner.impl;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
 import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
@@ -31,8 +30,6 @@ public class NonFilterScanner extends AbstractBlockletScanner {
   public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
                           QueryStatisticsModel queryStatisticsModel) {
     super(blockExecutionInfo);
-    // as its a non filter query creating a non filter query scanned result object
-    scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
     super.queryStatisticsModel = queryStatisticsModel;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
new file mode 100644
index 0000000..07e3487
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+/**
+ * Maintains the group of bitsets.
+ * Each filter executor returns BitSetGroup after filtering the data.
+ */
+public class BitSetGroup {
+
+  private BitSet[] bitSets;
+
+  public BitSetGroup(int groupSize) {
+    bitSets = new BitSet[groupSize];
+  }
+
+  public void setBitSet(BitSet bitSet, int index) {
+    assert index < bitSets.length;
+    bitSets[index] = bitSet;
+  }
+
+  public BitSet getBitSet(int index) {
+    assert index < bitSets.length;
+    return bitSets[index];
+  }
+
+  public boolean isEmpty() {
+    for (BitSet bitSet : bitSets) {
+      if (bitSet != null && !bitSet.isEmpty()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public void and(BitSetGroup group) {
+    int i = 0;
+    for (BitSet bitSet : bitSets) {
+      BitSet otherSet  = group.getBitSet(i);
+      if (bitSet != null && otherSet != null) {
+        bitSet.and(otherSet);
+      }
+      i++;
+    }
+  }
+
+  public void or(BitSetGroup group) {
+    int i = 0;
+    for (BitSet bitSet : bitSets) {
+      BitSet otherSet  = group.getBitSet(i);
+      if (bitSet != null && otherSet != null) {
+        bitSet.or(otherSet);
+      }
+      // if it is null and other set is not null then replace it.
+      if (bitSet == null && otherSet != null) {
+        bitSets[i] = otherSet;
+      }
+      i++;
+    }
+  }
+
+  public int getNumberOfPages() {
+    return bitSets.length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 11c8870..b9a96d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -45,9 +45,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
@@ -1233,6 +1233,18 @@ public final class CarbonUtil {
     }, offset, length);
   }
 
+  public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
+      throws IOException {
+    byte[] data = new byte[length];
+    dataChunkBuffer.position(offset);
+    dataChunkBuffer.get(data);
+    return (DataChunk2) read(data, new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new DataChunk2();
+      }
+    }, 0, length);
+  }
+
   /**
    * Below method will be used to convert the byte array value to thrift object for
    * data chunk
@@ -1353,19 +1365,19 @@ public final class CarbonUtil {
     return outputArray;
   }
 
-  public static void freeMemory(DimensionColumnDataChunk[] dimensionColumnDataChunk,
-      MeasureColumnDataChunk[] measureColumnDataChunks) {
-    if (null != measureColumnDataChunks) {
-      for (int i = 0; i < measureColumnDataChunks.length; i++) {
-        if (null != measureColumnDataChunks[i]) {
-          measureColumnDataChunks[i].freeMemory();
+  public static void freeMemory(DimensionRawColumnChunk[] dimensionRawColumnChunks,
+      MeasureRawColumnChunk[] measureRawColumnChunks) {
+    if (null != measureRawColumnChunks) {
+      for (int i = 0; i < measureRawColumnChunks.length; i++) {
+        if (null != measureRawColumnChunks[i]) {
+          measureRawColumnChunks[i].freeMemory();
         }
       }
     }
-    if (null != dimensionColumnDataChunk) {
-      for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
-        if (null != dimensionColumnDataChunk[i]) {
-          dimensionColumnDataChunk[i].freeMemory();
+    if (null != dimensionRawColumnChunks) {
+      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+        if (null != dimensionRawColumnChunks[i]) {
+          dimensionRawColumnChunks[i].freeMemory();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 5b89096..4882b0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -98,8 +98,17 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
             blockletInfoThrift.getColumn_data_chunks_offsets().size());
     blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
     blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
-    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
-    blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+
+    List<Integer> dimensionColumnChunkLengthInteger = new ArrayList<Integer>();
+    List<Integer> measureColumnChunkLengthInteger = new ArrayList<Integer>();
+    for (int i = 0; i < dimensionColumnChunkLength.size(); i++) {
+      dimensionColumnChunkLengthInteger.add(dimensionColumnChunkLength.get(i).intValue());
+    }
+    for (int i = 0; i < measureColumnChunksLength.size(); i++) {
+      measureColumnChunkLengthInteger.add(measureColumnChunksLength.get(i).intValue());
+    }
+    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLengthInteger);
+    blockletInfo.setMeasureChunksLength(measureColumnChunkLengthInteger);
     blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
     return blockletInfo;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index bf19e08..5f69753 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -23,6 +23,7 @@ import java.math.RoundingMode;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -316,6 +317,87 @@ public final class DataTypeUtil {
 
   }
 
+  /**
+   * Below method will be used to convert the data passed to its actual data
+   * type
+   *
+   * @param dataInBytes    data
+   * @param actualDataType actual data type
+   * @return actual data after conversion
+   */
+  public static Object getDataBasedOnDataType(byte[] dataInBytes, DataType actualDataType) {
+    if (null == dataInBytes || Arrays
+        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) {
+      return null;
+    }
+    try {
+      switch (actualDataType) {
+        case INT:
+          String data1 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data1.isEmpty()) {
+            return null;
+          }
+          return Integer.parseInt(data1);
+        case SHORT:
+          String data2 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data2.isEmpty()) {
+            return null;
+          }
+          return Short.parseShort(data2);
+        case DOUBLE:
+          String data3 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data3.isEmpty()) {
+            return null;
+          }
+          return Double.parseDouble(data3);
+        case LONG:
+          String data4 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data4.isEmpty()) {
+            return null;
+          }
+          return Long.parseLong(data4);
+        case DATE:
+          String data5 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data5.isEmpty()) {
+            return null;
+          }
+          try {
+            Date dateToStr = dateformatter.get().parse(data5);
+            return dateToStr.getTime() * 1000;
+          } catch (ParseException e) {
+            LOGGER.error("Cannot convert" + data5 + " to Time/Long type value" + e.getMessage());
+            return null;
+          }
+
+        case TIMESTAMP:
+          String data6 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data6.isEmpty()) {
+            return null;
+          }
+          try {
+            Date dateToStr = timeStampformatter.get().parse(data6);
+            return dateToStr.getTime() * 1000;
+          } catch (ParseException e) {
+            LOGGER.error("Cannot convert" + data6 + " to Time/Long type value" + e.getMessage());
+            return null;
+          }
+        case DECIMAL:
+          String data7 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data7.isEmpty()) {
+            return null;
+          }
+          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data7);
+          return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+        default:
+          return UTF8String.fromBytes(dataInBytes);
+      }
+    } catch (NumberFormatException ex) {
+      String data = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+      LOGGER.error("Problem while converting data type" + data);
+      return null;
+    }
+  }
+
   public static Object getMeasureDataBasedOnDataType(Object data, DataType dataType) {
 
     if (null == data) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index f2d110f..1d00199 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -95,7 +95,7 @@ public class DataTypeUtilTest {
     assertEquals(getDataBasedOnDataType("1", DataType.DECIMAL), expected);
     assertEquals(getDataBasedOnDataType("default", DataType.NULL),
         UTF8String.fromString("default"));
-    assertEquals(getDataBasedOnDataType(null, DataType.NULL), null);
+    assertEquals(getDataBasedOnDataType((String) null, DataType.NULL), null);
   }
 
   @Test public void testGetMeasureDataBasedOnDataType() throws NumberFormatException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
index 4a9cf81..07db1ba 100644
--- a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
+++ b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
@@ -117,7 +117,7 @@ public class FilterScannerTest {
 //    DimensionChunkAttributes dimensionChunkAttributes = new DimensionChunkAttributes();
 //    DimensionColumnDataChunk dimensionColumnDataChunk =
 //        new FixedLengthDimensionDataChunk(new byte[] { 0, 1 }, dimensionChunkAttributes);
-//    blocksChunkHolder.setDimensionDataChunk(new DimensionColumnDataChunk[]
+//    blocksChunkHolder.setDimensionRawDataChunk(new DimensionColumnDataChunk[]
 //
 //        { dimensionColumnDataChunk });
 //    MeasureColumnDataChunk measureColumnDataChunk = new MeasureColumnDataChunk();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 7333115..d7bab75 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -46,7 +46,7 @@ public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
       }
       if (dictionaries[i] != null) {
         data[i] = DataTypeUtil
-            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
+            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]),
                 dataTypes[i]);
         if (data[i] == null) {
           continue;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 470ca1f..3da8299 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -182,34 +182,21 @@ case class CarbonDictionaryDecoder(
           )
           new Iterator[InternalRow] {
             val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-            var flag = true
-            var total = 0L
-
             override final def hasNext: Boolean = {
-              flag = iter.hasNext
-              if (!flag && total > 0) {
-                val queryStatistic = new QueryStatistic()
-                queryStatistic
-                  .addFixedTimeStatistic(QueryStatisticsConstants.PREPARE_RESULT, total)
-                recorder.recordStatistics(queryStatistic)
-                recorder.logStatistics()
-              }
-              flag
+              iter.hasNext
             }
 
             override final def next(): InternalRow = {
-              val startTime = System.currentTimeMillis()
               val row: InternalRow = iter.next()
               val data = row.toSeq(dataTypes).toArray
               dictIndex.foreach { index =>
                 if (data(index) != null) {
                   data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                    .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                    .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
                     getDictionaryColumnIds(index)._3)
                 }
               }
               val result = unsafeProjection(new GenericMutableRow(data))
-              total += System.currentTimeMillis() - startTime
               result
             }
           }