You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/24 05:38:42 UTC

[1/5] incubator-carbondata git commit: WIP Added code for new V3 format to optimize scan

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 766671c79 -> 3e36cdf54


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 0c80a36..32279ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -181,19 +181,16 @@ case class CarbonDictionaryDecoder(
             override final def hasNext: Boolean = iter.hasNext
 
             override final def next(): InternalRow = {
-              val startTime = System.currentTimeMillis()
               val row: InternalRow = iter.next()
               val data = row.toSeq(dataTypes).toArray
               dictIndex.foreach { index =>
                 if (data(index) != null) {
                   data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                    .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                    .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
                     getDictionaryColumnIds(index)._3)
                 }
               }
-              val result = unsafeProjection(new GenericInternalRow(data))
-              total += System.currentTimeMillis() - startTime
-              result
+              unsafeProjection(new GenericInternalRow(data))
             }
           }
         }
@@ -342,7 +339,7 @@ class CarbonDecoderRDD(
         dictIndex.foreach { index =>
           if (data(index) != null) {
             data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-              .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
               getDictionaryColumnIds(index)._3)
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 5a50614..afbbae1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -195,6 +195,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           agg.aggregateExpressions.map {
             case attr: AttributeReference =>
             case a@Alias(attr: AttributeReference, name) =>
+            case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, _, _), _) =>
             case aggExp: AggregateExpression =>
               aggExp.transform {
                 case aggExp: AggregateExpression =>


[2/5] incubator-carbondata git commit: WIP Added code for new V3 format to optimize scan

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 9a8b254..ae9ba8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -53,11 +55,18 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangeValues;
     int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = isScanRequired(blockMinValue[columnIndex], filterValues);
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues) {
     boolean isScanRequired = false;
     for (int k = 0; k < filterValues.length; k++) {
       // and filter-min should be positive
-      int minCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
+      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue);
 
       // if any filter applied is not in range of min and max of block
       // then since its a less than fiter validate whether the block
@@ -67,26 +76,45 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
         break;
       }
     }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
-
+    return isScanRequired;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
     if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
       return super.applyFilter(blockChunkHolder);
     }
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk rawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+    for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+      if (rawColumnChunk.getMinValues() != null) {
+        if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
+          int compare = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
+          if (compare > 0) {
+            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
+            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          } else {
+            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+                rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        }
+      } else {
+        BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+            rawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
   }
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -156,7 +184,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       start = CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[i]);
       if (start < 0) {
         start = -(start + 1);
-        if (start == numerOfRows) {
+        if (start >= numerOfRows) {
           start = start - 1;
         }
         // Method will compare the tentative index value after binary search, this tentative
@@ -250,4 +278,16 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     }
     return bitSet;
   }
+
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      super.readBlocks(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index 8646301..af5568e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -91,12 +91,19 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
    *
    * @return start IndexKey
    */
-  public void getStartKey(long[] startKey,
-      SortedMap<Integer, byte[]> noDictStartKeys, List<long[]> startKeyList) {
-    FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
-        startKey, startKeyList);
-    FilterUtil
-        .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDictStartKeys);
+  public void getStartKey(long[] startKey, SortedMap<Integer, byte[]> noDictStartKeys,
+      List<long[]> startKeyList) {
+    switch (exp.getFilterExpressionType()) {
+      case GREATERTHAN:
+      case GREATERTHAN_EQUALTO:
+        FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
+            startKey, startKeyList);
+        FilterUtil
+            .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDictStartKeys);
+        break;
+      default:
+        //do nothing
+    }
   }
 
   /**
@@ -106,10 +113,17 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
    */
   @Override public void getEndKey(SegmentProperties segmentProperties, long[] endKeys,
       SortedMap<Integer, byte[]> noDicEndKeys, List<long[]> endKeyList) {
-    FilterUtil.getEndKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
-        endKeys, segmentProperties, endKeyList);
-    FilterUtil
-        .getEndKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDicEndKeys);
+    switch (exp.getFilterExpressionType()) {
+      case LESSTHAN:
+      case LESSTHAN_EQUALTO:
+        FilterUtil
+            .getEndKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(), endKeys,
+                segmentProperties, endKeyList);
+        FilterUtil.getEndKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDicEndKeys);
+        break;
+      default:
+        //do nothing
+    }
   }
 
   private List<byte[]> getNoDictionaryRangeValues() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index 6b0a458..f0cebf4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -18,6 +18,10 @@ package org.apache.carbondata.core.scan.processor;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -29,7 +33,6 @@ import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultColle
 import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector;
 import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.scanner.BlockletScanner;
@@ -63,23 +66,33 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
   protected BlockletScanner blockletScanner;
 
   /**
-   * to hold the data block
-   */
-  protected BlocksChunkHolder blocksChunkHolder;
-
-  /**
    * batch size of result
    */
   protected int batchSize;
 
+  protected ExecutorService executorService;
+
+  private Future<AbstractScannedResult> future;
+
+  private Future<BlocksChunkHolder> futureIo;
+
   protected AbstractScannedResult scannedResult;
 
-  public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
-      int batchSize, QueryStatisticsModel queryStatisticsModel,
-      BlocksChunkHolder blockChunkHolder) {
+  private BlockExecutionInfo blockExecutionInfo;
+
+  private FileHolder fileReader;
+
+  private AtomicBoolean nextBlock;
+
+  private AtomicBoolean nextRead;
+
+  public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo,
+      FileHolder fileReader, int batchSize, QueryStatisticsModel queryStatisticsModel,
+      ExecutorService executorService) {
+    this.blockExecutionInfo = blockExecutionInfo;
+    this.fileReader = fileReader;
     dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
         blockExecutionInfo.getNumberOfBlockToScan());
-    blocksChunkHolder = blockChunkHolder;
     if (blockExecutionInfo.getFilterExecuterTree() != null) {
       blockletScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel);
     } else {
@@ -99,13 +112,16 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
           new DictionaryBasedResultCollector(blockExecutionInfo);
     }
     this.batchSize = batchSize;
+    this.executorService = executorService;
+    this.nextBlock = new AtomicBoolean(false);
+    this.nextRead = new AtomicBoolean(false);
   }
 
   public boolean hasNext() {
     if (scannedResult != null && scannedResult.hasNext()) {
       return true;
     } else {
-      return dataBlockIterator.hasNext();
+      return dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get();
     }
   }
 
@@ -121,22 +137,85 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
           }
           scannedResult = getNextScannedResult();
         }
+        nextBlock.set(false);
+        nextRead.set(false);
         return false;
       }
-    } catch (IOException | FilterUnsupportedException ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
 
-  private AbstractScannedResult getNextScannedResult()
-      throws IOException, FilterUnsupportedException {
-    if (dataBlockIterator.hasNext()) {
-      blocksChunkHolder.setDataBlock(dataBlockIterator.next());
-      blocksChunkHolder.reset();
-      return blockletScanner.scanBlocklet(blocksChunkHolder);
+  private AbstractScannedResult getNextScannedResult() throws Exception {
+    AbstractScannedResult result = null;
+    if (dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get()) {
+      if (future == null) {
+        future = execute();
+      }
+      result = future.get();
+      nextBlock.set(false);
+      if (dataBlockIterator.hasNext() || nextRead.get()) {
+        nextBlock.set(true);
+        future = execute();
+      }
+    }
+    return result;
+  }
+
+  private BlocksChunkHolder getBlocksChunkHolder() throws IOException {
+    BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolderInternal();
+    while (blocksChunkHolder == null && dataBlockIterator.hasNext()) {
+      blocksChunkHolder = getBlocksChunkHolderInternal();
+    }
+    return blocksChunkHolder;
+  }
+
+  private BlocksChunkHolder getBlocksChunkHolderInternal() throws IOException {
+    BlocksChunkHolder blocksChunkHolder =
+        new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
+            blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
+    blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+    if (blockletScanner.isScanRequired(blocksChunkHolder)) {
+      return blocksChunkHolder;
     }
     return null;
   }
 
+  private Future<AbstractScannedResult> execute() {
+    return executorService.submit(new Callable<AbstractScannedResult>() {
+      @Override public AbstractScannedResult call() throws Exception {
+        if (futureIo == null) {
+          futureIo = executeRead();
+        }
+        BlocksChunkHolder blocksChunkHolder = futureIo.get();
+        futureIo = null;
+        nextRead.set(false);
+        if (blocksChunkHolder != null) {
+          if (dataBlockIterator.hasNext()) {
+            nextRead.set(true);
+            futureIo = executeRead();
+          }
+          return blockletScanner.scanBlocklet(blocksChunkHolder);
+        }
+        return null;
+      }
+    });
+  }
+
+  private Future<BlocksChunkHolder> executeRead() {
+    return executorService.submit(new Callable<BlocksChunkHolder>() {
+      @Override public BlocksChunkHolder call() throws Exception {
+        if (dataBlockIterator.hasNext()) {
+          BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder();
+          if (blocksChunkHolder != null) {
+            blockletScanner.readBlocklet(blocksChunkHolder);
+            return blocksChunkHolder;
+          }
+        }
+        return null;
+      }
+    });
+  }
+
   public abstract void processNextBatch(CarbonColumnarBatch columnarBatch);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
index 2b1a48e..5227115 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
@@ -18,8 +18,8 @@ package org.apache.carbondata.core.scan.processor;
 
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 
 /**
  * Block chunk holder which will hold the dimension and
@@ -30,12 +30,12 @@ public class BlocksChunkHolder {
   /**
    * dimension column data chunk
    */
-  private DimensionColumnDataChunk[] dimensionDataChunk;
+  private DimensionRawColumnChunk[] dimensionRawDataChunk;
 
   /**
    * measure column data chunk
    */
-  private MeasureColumnDataChunk[] measureDataChunk;
+  private MeasureRawColumnChunk[] measureRawDataChunk;
 
   /**
    * file reader which will use to read the block from file
@@ -48,36 +48,43 @@ public class BlocksChunkHolder {
   private DataRefNode dataBlock;
 
   public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock) {
-    dimensionDataChunk = new DimensionColumnDataChunk[numberOfDimensionBlock];
-    measureDataChunk = new MeasureColumnDataChunk[numberOfMeasureBlock];
+    dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
+    measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
+  }
+
+  public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock,
+      FileHolder fileReader) {
+    dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
+    measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
+    this.fileReader = fileReader;
   }
 
   /**
-   * @return the dimensionDataChunk
+   * @return the dimensionRawDataChunk
    */
-  public DimensionColumnDataChunk[] getDimensionDataChunk() {
-    return dimensionDataChunk;
+  public DimensionRawColumnChunk[] getDimensionRawDataChunk() {
+    return dimensionRawDataChunk;
   }
 
   /**
-   * @param dimensionDataChunk the dimensionDataChunk to set
+   * @param dimensionRawDataChunk the dimensionRawDataChunk to set
    */
-  public void setDimensionDataChunk(DimensionColumnDataChunk[] dimensionDataChunk) {
-    this.dimensionDataChunk = dimensionDataChunk;
+  public void setDimensionRawDataChunk(DimensionRawColumnChunk[] dimensionRawDataChunk) {
+    this.dimensionRawDataChunk = dimensionRawDataChunk;
   }
 
   /**
-   * @return the measureDataChunk
+   * @return the measureRawDataChunk
    */
-  public MeasureColumnDataChunk[] getMeasureDataChunk() {
-    return measureDataChunk;
+  public MeasureRawColumnChunk[] getMeasureRawDataChunk() {
+    return measureRawDataChunk;
   }
 
   /**
-   * @param measureDataChunk the measureDataChunk to set
+   * @param measureRawDataChunk the measureRawDataChunk to set
    */
-  public void setMeasureDataChunk(MeasureColumnDataChunk[] measureDataChunk) {
-    this.measureDataChunk = measureDataChunk;
+  public void setMeasureRawDataChunk(MeasureRawColumnChunk[] measureRawDataChunk) {
+    this.measureRawDataChunk = measureRawDataChunk;
   }
 
   /**
@@ -113,11 +120,11 @@ public class BlocksChunkHolder {
    * array
    */
   public void reset() {
-    for (int i = 0; i < measureDataChunk.length; i++) {
-      this.measureDataChunk[i] = null;
+    for (int i = 0; i < measureRawDataChunk.length; i++) {
+      this.measureRawDataChunk[i] = null;
     }
-    for (int i = 0; i < dimensionDataChunk.length; i++) {
-      this.dimensionDataChunk[i] = null;
+    for (int i = 0; i < dimensionRawDataChunk.length; i++) {
+      this.dimensionRawDataChunk[i] = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
index f78b75b..5eab7b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.processor.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -37,8 +38,8 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
    */
   public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
       int batchSize, QueryStatisticsModel queryStatisticsModel,
-      BlocksChunkHolder blockChunkHolder) {
-    super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, blockChunkHolder);
+      BlocksChunkHolder blockChunkHolder, ExecutorService executorService) {
+    super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, executorService);
   }
 
   /**
@@ -64,9 +65,6 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
   public void processNextBatch(CarbonColumnarBatch columnarBatch) {
     if (updateScanner()) {
       this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
-      while (columnarBatch.getActualSize() < columnarBatch.getBatchSize() && updateScanner()) {
-        this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 60e6f67..fbd7044 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -47,16 +48,23 @@ public abstract class AbstractScannedResult {
    * current row number
    */
   protected int currentRow = -1;
+
+  protected int pageCounter;
   /**
    * row mapping indexes
    */
-  protected int[] rowMapping;
+  protected int[][] rowMapping;
   /**
    * key size of the fixed length column
    */
   private int fixedLengthKeySize;
   /**
-   * total number of rows
+   * total number of rows per page
+   */
+  private int[] numberOfRows;
+
+  /**
+   * Total number of rows.
    */
   private int totalNumberOfRows;
   /**
@@ -66,11 +74,16 @@ public abstract class AbstractScannedResult {
   /**
    * dimension column data chunk
    */
-  protected DimensionColumnDataChunk[] dataChunks;
+  protected DimensionColumnDataChunk[][] dataChunks;
+
+  /**
+   * Raw dimension chunks;
+   */
+  protected DimensionRawColumnChunk[] rawColumnChunks;
   /**
    * measure column data chunk
    */
-  protected MeasureColumnDataChunk[] measureDataChunks;
+  protected MeasureColumnDataChunk[][] measureDataChunks;
   /**
    * dictionary column block index in file
    */
@@ -128,7 +141,7 @@ public abstract class AbstractScannedResult {
    *
    * @param dataChunks dimension chunks used in query
    */
-  public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) {
+  public void setDimensionChunks(DimensionColumnDataChunk[][] dataChunks) {
     this.dataChunks = dataChunks;
   }
 
@@ -137,10 +150,14 @@ public abstract class AbstractScannedResult {
    *
    * @param measureDataChunks measure data chunks
    */
-  public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) {
+  public void setMeasureChunks(MeasureColumnDataChunk[][] measureDataChunks) {
     this.measureDataChunks = measureDataChunks;
   }
 
+  public void setRawColumnChunks(DimensionRawColumnChunk[] rawColumnChunks) {
+    this.rawColumnChunks = rawColumnChunks;
+  }
+
   /**
    * Below method will be used to get the chunk based in measure ordinal
    *
@@ -148,7 +165,7 @@ public abstract class AbstractScannedResult {
    * @return measure column chunk
    */
   public MeasureColumnDataChunk getMeasureChunk(int ordinal) {
-    return measureDataChunks[ordinal];
+    return measureDataChunks[ordinal][pageCounter];
   }
 
   /**
@@ -162,7 +179,7 @@ public abstract class AbstractScannedResult {
     byte[] completeKey = new byte[fixedLengthKeySize];
     int offset = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      offset += dataChunks[dictionaryColumnBlockIndexes[i]]
+      offset += dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
           .fillChunkData(completeKey, offset, rowId,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
@@ -181,7 +198,7 @@ public abstract class AbstractScannedResult {
     int[] completeKey = new int[totalDimensionsSize];
     int column = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[dictionaryColumnBlockIndexes[i]]
+      column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
           .fillConvertedChunkData(rowId, column, completeKey,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
@@ -195,7 +212,7 @@ public abstract class AbstractScannedResult {
   public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[dictionaryColumnBlockIndexes[i]]
+      column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
           .fillConvertedChunkData(vectorInfo, column,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
@@ -207,7 +224,7 @@ public abstract class AbstractScannedResult {
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[noDictionaryColumnBlockIndexes[i]]
+      column = dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
           .fillConvertedChunkData(vectorInfo, column,
               columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
     }
@@ -219,7 +236,7 @@ public abstract class AbstractScannedResult {
   public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
     for (int i = 0; i < measuresOrdinal.length; i++) {
       vectorInfo[i].measureVectorFiller
-          .fillMeasureVector(measureDataChunks[measuresOrdinal[i]], vectorInfo[i]);
+          .fillMeasureVector(measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
     }
   }
 
@@ -233,8 +250,9 @@ public abstract class AbstractScannedResult {
         ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
         DataOutputStream dataOutput = new DataOutputStream(byteStream);
         try {
-          vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks,
-              rowMapping == null ? j : rowMapping[j], dataOutput);
+          vectorInfos[i].genericQueryType
+              .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks,
+                  rowMapping == null ? j : rowMapping[pageCounter][j], pageCounter, dataOutput);
           Object data = vectorInfos[i].genericQueryType
               .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
           vector.putObject(vectorOffset++, data);
@@ -257,6 +275,31 @@ public abstract class AbstractScannedResult {
   }
 
   /**
+   * Just increment the page counter and reset the remaining counters.
+   */
+  public void incrementPageCounter() {
+    rowCounter = 0;
+    currentRow = -1;
+    pageCounter++;
+  }
+
+  public int numberOfpages() {
+    return numberOfRows.length;
+  }
+
+  /**
+   * Get total rows in the current page
+   * @return
+   */
+  public int getCurrentPageRowCount() {
+    return numberOfRows[pageCounter];
+  }
+
+  public int getCurrentPageCounter() {
+    return pageCounter;
+  }
+
+  /**
    * increment the counter.
    */
   public void setRowCounter(int rowCounter) {
@@ -272,7 +315,7 @@ public abstract class AbstractScannedResult {
    * @return dimension data based on row id
    */
   protected byte[] getDimensionData(int dimOrdinal, int rowId) {
-    return dataChunks[dimOrdinal].getChunkData(rowId);
+    return dataChunks[dimOrdinal][pageCounter].getChunkData(rowId);
   }
 
   /**
@@ -287,7 +330,7 @@ public abstract class AbstractScannedResult {
     int position = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
       noDictionaryColumnsKeys[position++] =
-          dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId);
+          dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId);
     }
     return noDictionaryColumnsKeys;
   }
@@ -303,8 +346,8 @@ public abstract class AbstractScannedResult {
     String[] noDictionaryColumnsKeys = new String[noDictionaryColumnBlockIndexes.length];
     int position = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      noDictionaryColumnsKeys[position++] =
-          new String(dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId));
+      noDictionaryColumnsKeys[position++] = new String(
+          dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId));
     }
     return noDictionaryColumnsKeys;
   }
@@ -353,7 +396,9 @@ public abstract class AbstractScannedResult {
       ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
       DataOutputStream dataOutput = new DataOutputStream(byteStream);
       try {
-        genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks, rowId, dataOutput);
+        genericQueryType
+            .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, rowId, pageCounter,
+                dataOutput);
         complexTypeData[i] = byteStream.toByteArray();
       } catch (IOException e) {
         LOGGER.error(e);
@@ -378,8 +423,13 @@ public abstract class AbstractScannedResult {
    * @return
    */
   public boolean hasNext() {
-    if (rowCounter < this.totalNumberOfRows) {
+    if (pageCounter < numberOfRows.length && rowCounter < this.numberOfRows[pageCounter]) {
       return true;
+    } else if (pageCounter < numberOfRows.length) {
+      pageCounter++;
+      rowCounter = 0;
+      currentRow = -1;
+      return hasNext();
     }
     return false;
   }
@@ -393,13 +443,18 @@ public abstract class AbstractScannedResult {
   public void reset() {
     rowCounter = 0;
     currentRow = -1;
+    pageCounter = 0;
   }
 
   /**
-   * @param totalNumberOfRows set total of number rows valid after scanning
+   * @param numberOfRows set total of number rows valid after scanning
    */
-  public void setNumberOfRows(int totalNumberOfRows) {
-    this.totalNumberOfRows = totalNumberOfRows;
+  public void setNumberOfRows(int[] numberOfRows) {
+    this.numberOfRows = numberOfRows;
+
+    for (int count: numberOfRows) {
+      totalNumberOfRows += count;
+    }
   }
 
   /**
@@ -408,7 +463,7 @@ public abstract class AbstractScannedResult {
    *
    * @param indexes
    */
-  public void setIndexes(int[] indexes) {
+  public void setIndexes(int[][] indexes) {
     this.rowMapping = indexes;
   }
 
@@ -420,7 +475,8 @@ public abstract class AbstractScannedResult {
    * @return whether it is null or not
    */
   protected boolean isNullMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex);
+    return measureDataChunks[ordinal][pageCounter].getNullValueIndexHolder().getBitSet()
+        .get(rowIndex);
   }
 
   /**
@@ -432,7 +488,8 @@ public abstract class AbstractScannedResult {
    * @return measure value of long type
    */
   protected long getLongMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex);
+    return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
+        .getReadableLongValueByIndex(rowIndex);
   }
 
   /**
@@ -443,7 +500,7 @@ public abstract class AbstractScannedResult {
    * @return measure value of double type
    */
   protected double getDoubleMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getMeasureDataHolder()
+    return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
         .getReadableDoubleValueByIndex(rowIndex);
   }
 
@@ -455,7 +512,7 @@ public abstract class AbstractScannedResult {
    * @return measure of big decimal type
    */
   protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) {
-    return measureDataChunks[ordinal].getMeasureDataHolder()
+    return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
         .getReadableBigDecimalValueByIndex(rowIndex);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 6ca0570..d1f8b7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -37,7 +37,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    */
   @Override public byte[] getDictionaryKeyArray() {
     ++currentRow;
-    return getDictionaryKeyArray(rowMapping[currentRow]);
+    return getDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -46,7 +46,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    */
   @Override public int[] getDictionaryKeyIntegerArray() {
     ++currentRow;
-    return getDictionaryKeyIntegerArray(rowMapping[currentRow]);
+    return getDictionaryKeyIntegerArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -55,7 +55,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return complex type key array
    */
   @Override public byte[][] getComplexTypeKeyArray() {
-    return getComplexTypeKeyArray(rowMapping[currentRow]);
+    return getComplexTypeKeyArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -65,7 +65,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return no dictionary key array for all the no dictionary dimension
    */
   @Override public byte[][] getNoDictionaryKeyArray() {
-    return getNoDictionaryKeyArray(rowMapping[currentRow]);
+    return getNoDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -75,7 +75,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return no dictionary key array for all the no dictionary dimension
    */
   @Override public String[] getNoDictionaryKeyStringArray() {
-    return getNoDictionaryKeyStringArray(rowMapping[currentRow]);
+    return getNoDictionaryKeyStringArray(rowMapping[pageCounter][currentRow]);
   }
 
   /**
@@ -84,7 +84,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    * @return valid row id
    */
   @Override public int getCurrenrRowId() {
-    return rowMapping[currentRow];
+    return rowMapping[pageCounter][currentRow];
   }
 
   /**
@@ -93,8 +93,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
   public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[dictionaryColumnBlockIndexes[i]]
-          .fillConvertedChunkData(rowMapping, vectorInfo, column,
+      column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
+          .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
               columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
     }
   }
@@ -105,8 +105,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
     int column = 0;
     for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
-      column = dataChunks[noDictionaryColumnBlockIndexes[i]]
-          .fillConvertedChunkData(rowMapping, vectorInfo, column,
+      column = dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
+          .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
               columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
     }
   }
@@ -116,9 +116,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
    */
   public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
     for (int i = 0; i < measuresOrdinal.length; i++) {
-      vectorInfo[i].measureVectorFiller
-          .fillMeasureVectorForFilter(rowMapping, measureDataChunks[measuresOrdinal[i]],
-              vectorInfo[i]);
+      vectorInfo[i].measureVectorFiller.fillMeasureVectorForFilter(rowMapping[pageCounter],
+          measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index b4bcba6..1176e5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.scan.result.iterator;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -64,8 +65,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
    * file reader which will be used to execute the query
    */
   protected FileHolder fileReader;
+
   protected AbstractDataBlockIterator dataBlockIterator;
-  protected boolean nextBatch = false;
   /**
    * total time scan the blocks
    */
@@ -138,7 +139,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   }
 
   @Override public boolean hasNext() {
-    if ((dataBlockIterator != null && dataBlockIterator.hasNext()) || nextBatch) {
+    if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
       return true;
     } else if (blockExecutionInfos.size() > 0) {
       return true;
@@ -168,10 +169,10 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
       BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
       blockExecutionInfos.remove(executionInfo);
       queryStatisticsModel.setRecorder(recorder);
-      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-          blocksChunkHolder.getMeasureDataChunk());
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+          blocksChunkHolder.getMeasureRawDataChunk());
       return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel,
-          blocksChunkHolder);
+          blocksChunkHolder, execService);
     }
     return null;
   }
@@ -191,8 +192,14 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   }
 
   @Override public void close() {
-    CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-        blocksChunkHolder.getMeasureDataChunk());
+    try {
+      fileReader.finish();
+    } catch (IOException e) {
+      LOGGER.error(e);
+    } finally {
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+          blocksChunkHolder.getMeasureRawDataChunk());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index ed4b286..1fa2f4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -17,9 +17,7 @@
 package org.apache.carbondata.core.scan.result.iterator;
 
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -33,7 +31,6 @@ import org.apache.carbondata.core.scan.result.BatchResult;
 public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> {
 
   private final Object lock = new Object();
-  private Future<BatchResult> future;
 
   public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
       ExecutorService execService) {
@@ -41,43 +38,20 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
   }
 
   @Override public BatchResult next() {
-    BatchResult result;
     long startTime = System.currentTimeMillis();
-    try {
-      if (future == null) {
-        future = execute();
-      }
-      result = future.get();
-      nextBatch = false;
-      if (hasNext()) {
-        nextBatch = true;
-        future = execute();
-      } else {
-        fileReader.finish();
-      }
-      totalScanTime += System.currentTimeMillis() - startTime;
-    } catch (Exception ex) {
-      try {
-        fileReader.finish();
-      } finally {
-        throw new RuntimeException(ex);
-      }
-    }
-    return result;
+    BatchResult batchResult = getBatchResult();
+    totalScanTime += System.currentTimeMillis() - startTime;
+    return batchResult;
   }
 
-  private Future<BatchResult> execute() {
-    return execService.submit(new Callable<BatchResult>() {
-      @Override public BatchResult call() {
-        BatchResult batchResult = new BatchResult();
-        synchronized (lock) {
-          updateDataBlockIterator();
-          if (dataBlockIterator != null) {
-            batchResult.setRows(dataBlockIterator.next());
-          }
-        }
-        return batchResult;
+  private BatchResult getBatchResult() {
+    BatchResult batchResult = new BatchResult();
+    synchronized (lock) {
+      updateDataBlockIterator();
+      if (dataBlockIterator != null) {
+        batchResult.setRows(dataBlockIterator.next());
       }
-    });
+    }
+    return batchResult;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 258a476..341fb21 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -19,12 +19,17 @@ package org.apache.carbondata.core.scan.scanner;
 import java.io.IOException;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
 import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
@@ -35,51 +40,71 @@ import org.apache.carbondata.core.stats.QueryStatisticsModel;
 public abstract class AbstractBlockletScanner implements BlockletScanner {
 
   /**
-   * scanner result
-   */
-  protected AbstractScannedResult scannedResult;
-
-  /**
    * block execution info
    */
   protected BlockExecutionInfo blockExecutionInfo;
 
   public QueryStatisticsModel queryStatisticsModel;
 
+  private AbstractScannedResult emptyResult;
+
   public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
     this.blockExecutionInfo = tableBlockExecutionInfos;
   }
 
   @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
       throws IOException, FilterUnsupportedException {
-    fillKeyValue(blocksChunkHolder);
-    return scannedResult;
-  }
-
-  protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) throws IOException {
-
+    AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
     QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
-            .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+        .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
     totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
-            totalBlockletStatistic.getCount() + 1);
+        totalBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic);
-    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel
-            .getStatisticsTypeAndObjMap().get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
     validScannedBlockletStatistic
-            .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
-                    validScannedBlockletStatistic.getCount() + 1);
+        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+            validScannedBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
-    scannedResult.reset();
-    scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
     scannedResult.setBlockletId(
-              blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR
-                      + blocksChunkHolder.getDataBlock().nodeNumber());
-    scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
-        .getDimensionChunks(blocksChunkHolder.getFileReader(),
-            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
-    scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
-            .getMeasureChunks(blocksChunkHolder.getFileReader(),
-                blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+            .getDataBlock().nodeNumber());
+    DimensionRawColumnChunk[] dimensionRawColumnChunks =
+        blocksChunkHolder.getDimensionRawDataChunk();
+    DimensionColumnDataChunk[][] dimensionColumnDataChunks =
+        new DimensionColumnDataChunk[dimensionRawColumnChunks.length][];
+    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+      if (dimensionRawColumnChunks[i] != null) {
+        dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks();
+      }
+    }
+    scannedResult.setDimensionChunks(dimensionColumnDataChunks);
+    MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getMeasureRawDataChunk();
+    MeasureColumnDataChunk[][] measureColumnDataChunks =
+        new MeasureColumnDataChunk[measureRawColumnChunks.length][];
+    for (int i = 0; i < measureRawColumnChunks.length; i++) {
+      if (measureRawColumnChunks[i] != null) {
+        measureColumnDataChunks[i] = measureRawColumnChunks[i].convertToMeasureColDataChunks();
+      }
+    }
+    scannedResult.setMeasureChunks(measureColumnDataChunks);
+    int[] numberOfRows = new int[] { blocksChunkHolder.getDataBlock().nodeSize() };
+    if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
+      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+        if (dimensionRawColumnChunks[i] != null) {
+          numberOfRows = dimensionRawColumnChunks[i].getRowCount();
+          break;
+        }
+      }
+    } else if (blockExecutionInfo.getAllSelectedMeasureBlocksIndexes().length > 0) {
+      for (int i = 0; i < measureRawColumnChunks.length; i++) {
+        if (measureRawColumnChunks[i] != null) {
+          numberOfRows = measureRawColumnChunks[i].getRowCount();
+          break;
+        }
+      }
+    }
+    scannedResult.setNumberOfRows(numberOfRows);
     // loading delete data cache in blockexecutioninfo instance
     DeleteDeltaCacheLoaderIntf deleteCacheLoader =
         new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
@@ -87,5 +112,32 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
     deleteCacheLoader.loadDeleteDeltaFileDataToCache();
     scannedResult
         .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+    scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
+    return scannedResult;
+  }
+
+  @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    DimensionRawColumnChunk[] dimensionRawColumnChunks = blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(blocksChunkHolder.getFileReader(),
+            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes());
+    blocksChunkHolder.setDimensionRawDataChunk(dimensionRawColumnChunks);
+    MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getDataBlock()
+        .getMeasureChunks(blocksChunkHolder.getFileReader(),
+            blockExecutionInfo.getAllSelectedMeasureBlocksIndexes());
+    blocksChunkHolder.setMeasureRawDataChunk(measureRawColumnChunks);
+  }
+
+  @Override public AbstractScannedResult createEmptyResult() {
+    if (emptyResult == null) {
+      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+      emptyResult.setNumberOfRows(new int[0]);
+      emptyResult.setIndexes(new int[0][]);
+    }
+    return emptyResult;
+  }
+
+  @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    // For non filter it is always true
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
index 6b8a94a..0ed0d43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -29,6 +29,14 @@ import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 public interface BlockletScanner {
 
   /**
+   * Checks whether this blocklet required to scan or not based on min max of each blocklet.
+   * @param blocksChunkHolder
+   * @return
+   * @throws IOException
+   */
+  boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException;
+
+  /**
    * Below method will used to process the block data and get the scanned result
    *
    * @param blocksChunkHolder block chunk which holds the block data
@@ -37,4 +45,16 @@ public interface BlockletScanner {
    */
   AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
       throws IOException, FilterUnsupportedException;
+
+  /**
+   * Just reads the blocklet from file, does not uncompress it.
+   * @param blocksChunkHolder
+   */
+  void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException;
+
+  /**
+   * In case if there is no filter satisfies.
+   * @return AbstractScannedResult
+   */
+  AbstractScannedResult createEmptyResult();
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index 41e4fa5..c3d86aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -24,6 +24,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
 import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -36,6 +38,7 @@ import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -66,7 +69,6 @@ public class FilterScanner extends AbstractBlockletScanner {
   public FilterScanner(BlockExecutionInfo blockExecutionInfo,
       QueryStatisticsModel queryStatisticsModel) {
     super(blockExecutionInfo);
-    scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
     // to check whether min max is enabled or not
     String minMaxEnableValue = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
@@ -87,8 +89,26 @@ public class FilterScanner extends AbstractBlockletScanner {
    */
   @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
       throws IOException, FilterUnsupportedException {
-    fillScannedResult(blocksChunkHolder);
-    return scannedResult;
+    return fillScannedResult(blocksChunkHolder);
+  }
+
+  @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    // apply min max
+    if (isMinMaxEnabled) {
+      BitSet bitSet = this.filterExecuter
+          .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
+              blocksChunkHolder.getDataBlock().getColumnsMinValue());
+      if (bitSet.isEmpty()) {
+        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+            blocksChunkHolder.getMeasureRawDataChunk());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    this.filterExecuter.readBlocks(blocksChunkHolder);
   }
 
   /**
@@ -107,35 +127,21 @@ public class FilterScanner extends AbstractBlockletScanner {
    * @param blocksChunkHolder
    * @throws FilterUnsupportedException
    */
-  private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
+  private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHolder)
       throws FilterUnsupportedException, IOException {
-    scannedResult.reset();
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
-            .getDataBlock().nodeNumber());
-    // apply min max
-    if (isMinMaxEnabled) {
-      BitSet bitSet = this.filterExecuter
-          .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
-              blocksChunkHolder.getDataBlock().getColumnsMinValue());
-      if (bitSet.isEmpty()) {
-        scannedResult.setNumberOfRows(0);
-        scannedResult.setIndexes(new int[0]);
-        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-            blocksChunkHolder.getMeasureDataChunk());
-        return;
-      }
-    }
     // apply filter on actual data
-    BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder);
+    BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder);
     // if indexes is empty then return with empty result
-    if (bitSet.isEmpty()) {
-      scannedResult.setNumberOfRows(0);
-      scannedResult.setIndexes(new int[0]);
-      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
-          blocksChunkHolder.getMeasureDataChunk());
-      return;
+    if (bitSetGroup.isEmpty()) {
+      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+          blocksChunkHolder.getMeasureRawDataChunk());
+      return createEmptyResult();
     }
+
+    AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+    scannedResult.setBlockletId(
+        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+            .getDataBlock().nodeNumber());
     // valid scanned blocklet
     QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
@@ -143,11 +149,20 @@ public class FilterScanner extends AbstractBlockletScanner {
         .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
             validScannedBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+    int[] rowCount = new int[bitSetGroup.getNumberOfPages()];
     // get the row indexes from bot set
-    int[] indexes = new int[bitSet.cardinality()];
-    int index = 0;
-    for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
-      indexes[index++] = i;
+    int[][] indexesGroup = new int[bitSetGroup.getNumberOfPages()][];
+    for (int k = 0; k < indexesGroup.length; k++) {
+      BitSet bitSet = bitSetGroup.getBitSet(k);
+      if (bitSet != null && !bitSet.isEmpty()) {
+        int[] indexes = new int[bitSet.cardinality()];
+        int index = 0;
+        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+          indexes[index++] = i;
+        }
+        rowCount[k] = indexes.length;
+        indexesGroup[k] = indexes;
+      }
     }
     // loading delete data cache in blockexecutioninfo instance
     DeleteDeltaCacheLoaderIntf deleteCacheLoader =
@@ -159,42 +174,91 @@ public class FilterScanner extends AbstractBlockletScanner {
     FileHolder fileReader = blocksChunkHolder.getFileReader();
     int[][] allSelectedDimensionBlocksIndexes =
         blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
-    DimensionColumnDataChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
+    DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
         .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
 
-    DimensionColumnDataChunk[] dimensionColumnDataChunk =
-        new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
+    DimensionRawColumnChunk[] dimensionRawColumnChunks =
+        new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
     // read dimension chunk blocks from file which is not present
-    for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
-      if (null != blocksChunkHolder.getDimensionDataChunk()[i]) {
-        dimensionColumnDataChunk[i] = blocksChunkHolder.getDimensionDataChunk()[i];
+    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+      if (null != blocksChunkHolder.getDimensionRawDataChunk()[i]) {
+        dimensionRawColumnChunks[i] = blocksChunkHolder.getDimensionRawDataChunk()[i];
       }
     }
     for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
-      System.arraycopy(projectionListDimensionChunk, allSelectedDimensionBlocksIndexes[i][0],
-          dimensionColumnDataChunk, allSelectedDimensionBlocksIndexes[i][0],
-          allSelectedDimensionBlocksIndexes[i][1] + 1 - allSelectedDimensionBlocksIndexes[i][0]);
+      for (int j = allSelectedDimensionBlocksIndexes[i][0];
+           j <= allSelectedDimensionBlocksIndexes[i][1]; j++) {
+        dimensionRawColumnChunks[j] = projectionListDimensionChunk[j];
+      }
     }
-    MeasureColumnDataChunk[] measureColumnDataChunk =
-        new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
+    /**
+     * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
+     * then loading them
+     */
+    int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
+    int projectionListDimensionIndexesLength = projectionListDimensionIndexes.length;
+    for (int i = 0; i < projectionListDimensionIndexesLength; i++) {
+      if (null == dimensionRawColumnChunks[projectionListDimensionIndexes[i]]) {
+        dimensionRawColumnChunks[projectionListDimensionIndexes[i]] =
+            blocksChunkHolder.getDataBlock()
+                .getDimensionChunk(fileReader, projectionListDimensionIndexes[i]);
+      }
+    }
+    MeasureRawColumnChunk[] measureRawColumnChunks =
+        new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
     int[][] allSelectedMeasureBlocksIndexes =
         blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
-    MeasureColumnDataChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
+    MeasureRawColumnChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
         .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
     // read the measure chunk blocks which is not present
-    for (int i = 0; i < measureColumnDataChunk.length; i++) {
-      if (null != blocksChunkHolder.getMeasureDataChunk()[i]) {
-        measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i];
+    for (int i = 0; i < measureRawColumnChunks.length; i++) {
+      if (null != blocksChunkHolder.getMeasureRawDataChunk()[i]) {
+        measureRawColumnChunks[i] = blocksChunkHolder.getMeasureRawDataChunk()[i];
       }
     }
     for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
-      System.arraycopy(projectionListMeasureChunk, allSelectedMeasureBlocksIndexes[i][0],
-          measureColumnDataChunk, allSelectedMeasureBlocksIndexes[i][0],
-          allSelectedMeasureBlocksIndexes[i][1] + 1 - allSelectedMeasureBlocksIndexes[i][0]);
+      for (int j = allSelectedMeasureBlocksIndexes[i][0];
+           j <= allSelectedMeasureBlocksIndexes[i][1]; j++) {
+        measureRawColumnChunks[j] = projectionListMeasureChunk[j];
+      }
+    }
+    /**
+     * in case projection if the projected measure are not loaded in the measureColumnDataChunk
+     * then loading them
+     */
+    int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
+    int projectionListMeasureIndexesLength = projectionListMeasureIndexes.length;
+    for (int i = 0; i < projectionListMeasureIndexesLength; i++) {
+      if (null == measureRawColumnChunks[projectionListMeasureIndexes[i]]) {
+        measureRawColumnChunks[projectionListMeasureIndexes[i]] = blocksChunkHolder.getDataBlock()
+            .getMeasureChunk(fileReader, projectionListMeasureIndexes[i]);
+      }
     }
-    scannedResult.setDimensionChunks(dimensionColumnDataChunk);
-    scannedResult.setIndexes(indexes);
-    scannedResult.setMeasureChunks(measureColumnDataChunk);
-    scannedResult.setNumberOfRows(indexes.length);
+    DimensionColumnDataChunk[][] dimensionColumnDataChunks =
+        new DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length];
+    MeasureColumnDataChunk[][] measureColumnDataChunks =
+        new MeasureColumnDataChunk[measureRawColumnChunks.length][indexesGroup.length];
+    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+      for (int j = 0; j < indexesGroup.length; j++) {
+        if (dimensionRawColumnChunks[i] != null) {
+          dimensionColumnDataChunks[i][j] =
+              dimensionRawColumnChunks[i].convertToDimColDataChunk(j);
+        }
+      }
+    }
+    for (int i = 0; i < measureRawColumnChunks.length; i++) {
+      for (int j = 0; j < indexesGroup.length; j++) {
+        if (measureRawColumnChunks[i] != null) {
+          measureColumnDataChunks[i][j] =
+              measureRawColumnChunks[i].convertToMeasureColDataChunk(j);
+        }
+      }
+    }
+    scannedResult.setDimensionChunks(dimensionColumnDataChunks);
+    scannedResult.setIndexes(indexesGroup);
+    scannedResult.setMeasureChunks(measureColumnDataChunks);
+    scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
+    scannedResult.setNumberOfRows(rowCount);
+    return scannedResult;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
index cda39f2..1373ed5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
@@ -17,7 +17,6 @@
 package org.apache.carbondata.core.scan.scanner.impl;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
 import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
 
@@ -31,8 +30,6 @@ public class NonFilterScanner extends AbstractBlockletScanner {
   public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
                           QueryStatisticsModel queryStatisticsModel) {
     super(blockExecutionInfo);
-    // as its a non filter query creating a non filter query scanned result object
-    scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
     super.queryStatisticsModel = queryStatisticsModel;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
new file mode 100644
index 0000000..07e3487
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+/**
+ * Maintains the group of bitsets.
+ * Each filter executor returns BitSetGroup after filtering the data.
+ */
+public class BitSetGroup {
+
+  private BitSet[] bitSets;
+
+  public BitSetGroup(int groupSize) {
+    bitSets = new BitSet[groupSize];
+  }
+
+  public void setBitSet(BitSet bitSet, int index) {
+    assert index < bitSets.length;
+    bitSets[index] = bitSet;
+  }
+
+  public BitSet getBitSet(int index) {
+    assert index < bitSets.length;
+    return bitSets[index];
+  }
+
+  public boolean isEmpty() {
+    for (BitSet bitSet : bitSets) {
+      if (bitSet != null && !bitSet.isEmpty()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public void and(BitSetGroup group) {
+    int i = 0;
+    for (BitSet bitSet : bitSets) {
+      BitSet otherSet  = group.getBitSet(i);
+      if (bitSet != null && otherSet != null) {
+        bitSet.and(otherSet);
+      }
+      i++;
+    }
+  }
+
+  public void or(BitSetGroup group) {
+    int i = 0;
+    for (BitSet bitSet : bitSets) {
+      BitSet otherSet  = group.getBitSet(i);
+      if (bitSet != null && otherSet != null) {
+        bitSet.or(otherSet);
+      }
+      // if it is null and other set is not null then replace it.
+      if (bitSet == null && otherSet != null) {
+        bitSets[i] = otherSet;
+      }
+      i++;
+    }
+  }
+
+  public int getNumberOfPages() {
+    return bitSets.length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 11c8870..b9a96d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -45,9 +45,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
@@ -1233,6 +1233,18 @@ public final class CarbonUtil {
     }, offset, length);
   }
 
+  public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
+      throws IOException {
+    byte[] data = new byte[length];
+    dataChunkBuffer.position(offset);
+    dataChunkBuffer.get(data);
+    return (DataChunk2) read(data, new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new DataChunk2();
+      }
+    }, 0, length);
+  }
+
   /**
    * Below method will be used to convert the byte array value to thrift object for
    * data chunk
@@ -1353,19 +1365,19 @@ public final class CarbonUtil {
     return outputArray;
   }
 
-  public static void freeMemory(DimensionColumnDataChunk[] dimensionColumnDataChunk,
-      MeasureColumnDataChunk[] measureColumnDataChunks) {
-    if (null != measureColumnDataChunks) {
-      for (int i = 0; i < measureColumnDataChunks.length; i++) {
-        if (null != measureColumnDataChunks[i]) {
-          measureColumnDataChunks[i].freeMemory();
+  public static void freeMemory(DimensionRawColumnChunk[] dimensionRawColumnChunks,
+      MeasureRawColumnChunk[] measureRawColumnChunks) {
+    if (null != measureRawColumnChunks) {
+      for (int i = 0; i < measureRawColumnChunks.length; i++) {
+        if (null != measureRawColumnChunks[i]) {
+          measureRawColumnChunks[i].freeMemory();
         }
       }
     }
-    if (null != dimensionColumnDataChunk) {
-      for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
-        if (null != dimensionColumnDataChunk[i]) {
-          dimensionColumnDataChunk[i].freeMemory();
+    if (null != dimensionRawColumnChunks) {
+      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+        if (null != dimensionRawColumnChunks[i]) {
+          dimensionRawColumnChunks[i].freeMemory();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 5b89096..4882b0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -98,8 +98,17 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
             blockletInfoThrift.getColumn_data_chunks_offsets().size());
     blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
     blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
-    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
-    blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+
+    List<Integer> dimensionColumnChunkLengthInteger = new ArrayList<Integer>();
+    List<Integer> measureColumnChunkLengthInteger = new ArrayList<Integer>();
+    for (int i = 0; i < dimensionColumnChunkLength.size(); i++) {
+      dimensionColumnChunkLengthInteger.add(dimensionColumnChunkLength.get(i).intValue());
+    }
+    for (int i = 0; i < measureColumnChunksLength.size(); i++) {
+      measureColumnChunkLengthInteger.add(measureColumnChunksLength.get(i).intValue());
+    }
+    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLengthInteger);
+    blockletInfo.setMeasureChunksLength(measureColumnChunkLengthInteger);
     blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
     return blockletInfo;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index bf19e08..5f69753 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -23,6 +23,7 @@ import java.math.RoundingMode;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -316,6 +317,87 @@ public final class DataTypeUtil {
 
   }
 
+  /**
+   * Below method will be used to convert the data passed to its actual data
+   * type
+   *
+   * @param dataInBytes    data
+   * @param actualDataType actual data type
+   * @return actual data after conversion
+   */
+  public static Object getDataBasedOnDataType(byte[] dataInBytes, DataType actualDataType) {
+    if (null == dataInBytes || Arrays
+        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) {
+      return null;
+    }
+    try {
+      switch (actualDataType) {
+        case INT:
+          String data1 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data1.isEmpty()) {
+            return null;
+          }
+          return Integer.parseInt(data1);
+        case SHORT:
+          String data2 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data2.isEmpty()) {
+            return null;
+          }
+          return Short.parseShort(data2);
+        case DOUBLE:
+          String data3 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data3.isEmpty()) {
+            return null;
+          }
+          return Double.parseDouble(data3);
+        case LONG:
+          String data4 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data4.isEmpty()) {
+            return null;
+          }
+          return Long.parseLong(data4);
+        case DATE:
+          String data5 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data5.isEmpty()) {
+            return null;
+          }
+          try {
+            Date dateToStr = dateformatter.get().parse(data5);
+            return dateToStr.getTime() * 1000;
+          } catch (ParseException e) {
+            LOGGER.error("Cannot convert" + data5 + " to Time/Long type value" + e.getMessage());
+            return null;
+          }
+
+        case TIMESTAMP:
+          String data6 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data6.isEmpty()) {
+            return null;
+          }
+          try {
+            Date dateToStr = timeStampformatter.get().parse(data6);
+            return dateToStr.getTime() * 1000;
+          } catch (ParseException e) {
+            LOGGER.error("Cannot convert" + data6 + " to Time/Long type value" + e.getMessage());
+            return null;
+          }
+        case DECIMAL:
+          String data7 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+          if (data7.isEmpty()) {
+            return null;
+          }
+          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data7);
+          return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+        default:
+          return UTF8String.fromBytes(dataInBytes);
+      }
+    } catch (NumberFormatException ex) {
+      String data = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+      LOGGER.error("Problem while converting data type" + data);
+      return null;
+    }
+  }
+
   public static Object getMeasureDataBasedOnDataType(Object data, DataType dataType) {
 
     if (null == data) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index f2d110f..1d00199 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -95,7 +95,7 @@ public class DataTypeUtilTest {
     assertEquals(getDataBasedOnDataType("1", DataType.DECIMAL), expected);
     assertEquals(getDataBasedOnDataType("default", DataType.NULL),
         UTF8String.fromString("default"));
-    assertEquals(getDataBasedOnDataType(null, DataType.NULL), null);
+    assertEquals(getDataBasedOnDataType((String) null, DataType.NULL), null);
   }
 
   @Test public void testGetMeasureDataBasedOnDataType() throws NumberFormatException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
index 4a9cf81..07db1ba 100644
--- a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
+++ b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
@@ -117,7 +117,7 @@ public class FilterScannerTest {
 //    DimensionChunkAttributes dimensionChunkAttributes = new DimensionChunkAttributes();
 //    DimensionColumnDataChunk dimensionColumnDataChunk =
 //        new FixedLengthDimensionDataChunk(new byte[] { 0, 1 }, dimensionChunkAttributes);
-//    blocksChunkHolder.setDimensionDataChunk(new DimensionColumnDataChunk[]
+//    blocksChunkHolder.setDimensionRawDataChunk(new DimensionColumnDataChunk[]
 //
 //        { dimensionColumnDataChunk });
 //    MeasureColumnDataChunk measureColumnDataChunk = new MeasureColumnDataChunk();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 7333115..d7bab75 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -46,7 +46,7 @@ public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
       }
       if (dictionaries[i] != null) {
         data[i] = DataTypeUtil
-            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
+            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]),
                 dataTypes[i]);
         if (data[i] == null) {
           continue;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 470ca1f..3da8299 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -182,34 +182,21 @@ case class CarbonDictionaryDecoder(
           )
           new Iterator[InternalRow] {
             val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-            var flag = true
-            var total = 0L
-
             override final def hasNext: Boolean = {
-              flag = iter.hasNext
-              if (!flag && total > 0) {
-                val queryStatistic = new QueryStatistic()
-                queryStatistic
-                  .addFixedTimeStatistic(QueryStatisticsConstants.PREPARE_RESULT, total)
-                recorder.recordStatistics(queryStatistic)
-                recorder.logStatistics()
-              }
-              flag
+              iter.hasNext
             }
 
             override final def next(): InternalRow = {
-              val startTime = System.currentTimeMillis()
               val row: InternalRow = iter.next()
               val data = row.toSeq(dataTypes).toArray
               dictIndex.foreach { index =>
                 if (data(index) != null) {
                   data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                    .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                    .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
                     getDictionaryColumnIds(index)._3)
                 }
               }
               val result = unsafeProjection(new GenericMutableRow(data))
-              total += System.currentTimeMillis() - startTime
               result
             }
           }



[4/5] incubator-carbondata git commit: WIP Added code for new V3 format to optimize scan

Posted by ja...@apache.org.
WIP Added code for new V3 format to optimize scan

Fixed testcases

Fixed style

Fixed issue

Added read a head blocklet PR to it

fixed style

Refactored code

Added read a head blocklet

Optimized decoder

Updated code of V3 format interfaces

OPtimized greater than and less than filters

Fixed col group queries

Refactored V1 format with new interface

Fixed complex query

Fixed comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/72cb415a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/72cb415a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/72cb415a

Branch: refs/heads/master
Commit: 72cb415a1e1126882c38ecfead01dc6b7bb4cc07
Parents: 766671c
Author: ravipesala <ra...@gmail.com>
Authored: Fri Feb 3 16:11:06 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Feb 24 13:34:40 2017 +0800

----------------------------------------------------------------------
 .../AbstractColumnDictionaryInfo.java           |  18 +-
 .../core/cache/dictionary/Dictionary.java       |  13 ++
 .../cache/dictionary/ForwardDictionary.java     |  13 ++
 .../cache/dictionary/ReverseDictionary.java     |  13 ++
 .../core/constants/CarbonCommonConstants.java   |  12 +-
 .../carbondata/core/datastore/DataRefNode.java  |  12 +-
 .../carbondata/core/datastore/FileHolder.java   |  14 ++
 .../datastore/chunk/AbstractRawColumnChunk.java | 124 +++++++++++
 .../chunk/impl/DimensionRawColumnChunk.java     | 105 +++++++++
 .../chunk/impl/MeasureRawColumnChunk.java       | 107 +++++++++
 .../reader/DimensionColumnChunkReader.java      |  20 +-
 .../chunk/reader/MeasureColumnChunkReader.java  |  20 +-
 ...mpressedDimensionChunkFileBasedReaderV1.java |  92 +++++---
 ...mpressedDimensionChunkFileBasedReaderV2.java | 215 +++++++++----------
 ...CompressedMeasureChunkFileBasedReaderV1.java |  49 +++--
 ...CompressedMeasureChunkFileBasedReaderV2.java | 169 +++++++--------
 .../core/datastore/columnar/UnBlockIndexer.java |   7 +-
 .../core/datastore/impl/DFSFileHolderImpl.java  |   9 +
 .../core/datastore/impl/FileHolderImpl.java     |   8 +
 .../impl/btree/AbstractBTreeLeafNode.java       |  12 +-
 .../datastore/impl/btree/BTreeNonLeafNode.java  |  12 +-
 .../impl/btree/BlockletBTreeLeafNode.java       |  20 +-
 .../core/metadata/blocklet/BlockletInfo.java    |  32 ++-
 .../DictionaryBasedVectorResultCollector.java   |  54 +++--
 .../core/scan/complextypes/ArrayQueryType.java  |  16 +-
 .../scan/complextypes/ComplexQueryType.java     |  13 +-
 .../scan/complextypes/PrimitiveQueryType.java   |  12 +-
 .../core/scan/complextypes/StructQueryType.java |  10 +-
 .../executor/impl/AbstractQueryExecutor.java    |  19 +-
 .../scan/executor/infos/BlockExecutionInfo.java |  27 +++
 .../core/scan/executor/util/QueryUtil.java      |  13 +-
 .../carbondata/core/scan/filter/FilterUtil.java |   5 -
 .../core/scan/filter/GenericQueryType.java      |   6 +-
 .../filter/executer/AndFilterExecuterImpl.java  |  12 +-
 .../executer/ExcludeFilterExecuterImpl.java     |  32 ++-
 .../scan/filter/executer/FilterExecuter.java    |   9 +-
 .../executer/IncludeFilterExecuterImpl.java     |  54 ++++-
 .../filter/executer/OrFilterExecuterImpl.java   |  11 +-
 .../executer/RestructureFilterExecuterImpl.java |  52 -----
 .../executer/RowLevelFilterExecuterImpl.java    | 192 +++++++++--------
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  69 ++++--
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  65 ++++--
 ...velRangeLessThanEqualFilterExecuterImpl.java |  67 ++++--
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  66 ++++--
 .../RowLevelRangeFilterResolverImpl.java        |  34 ++-
 .../processor/AbstractDataBlockIterator.java    | 117 ++++++++--
 .../core/scan/processor/BlocksChunkHolder.java  |  51 +++--
 .../processor/impl/DataBlockIteratorImpl.java   |   8 +-
 .../core/scan/result/AbstractScannedResult.java | 113 +++++++---
 .../result/impl/FilterQueryScannedResult.java   |  25 ++-
 .../AbstractDetailQueryResultIterator.java      |  21 +-
 .../iterator/DetailQueryResultIterator.java     |  48 +----
 .../scan/scanner/AbstractBlockletScanner.java   | 106 ++++++---
 .../core/scan/scanner/BlockletScanner.java      |  20 ++
 .../core/scan/scanner/impl/FilterScanner.java   | 172 ++++++++++-----
 .../scan/scanner/impl/NonFilterScanner.java     |   3 -
 .../carbondata/core/util/BitSetGroup.java       |  82 +++++++
 .../apache/carbondata/core/util/CarbonUtil.java |  36 ++--
 .../core/util/DataFileFooterConverter2.java     |  13 +-
 .../carbondata/core/util/DataTypeUtil.java      |  82 +++++++
 .../carbondata/core/util/DataTypeUtilTest.java  |   2 +-
 .../scanner/impl/FilterScannerTest.java         |   2 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  17 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   9 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |   1 +
 66 files changed, 2025 insertions(+), 839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
index f02e6b5..18f4885 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
@@ -240,12 +240,28 @@ public abstract class AbstractColumnDictionaryInfo implements DictionaryInfo {
     byte[] dictionaryValueInBytes = getDictionaryBytesFromSurrogate(surrogateKey);
     if (null != dictionaryValueInBytes) {
       dictionaryValue = new String(dictionaryValueInBytes,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+          CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
     }
     return dictionaryValue;
   }
 
   /**
+   * This method will find and return the dictionary value for a given surrogate key.
+   * Applicable scenarios:
+   * 1. Query final result preparation : While convert the final result which will
+   * be surrogate key back to original dictionary values this method will be used
+   *
+   * @param surrogateKey a unique ID for a dictionary value
+   * @return value if found else null
+   */
+  @Override public byte[] getDictionaryValueForKeyInBytes(int surrogateKey) {
+    if (surrogateKey < MINIMUM_SURROGATE_KEY) {
+      return null;
+    }
+    return getDictionaryBytesFromSurrogate(surrogateKey);
+  }
+
+  /**
    * This method will find and return the dictionary value as byte array for a
    * given surrogate key
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/Dictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/Dictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/Dictionary.java
index 8a74040..7302de2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/Dictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/Dictionary.java
@@ -59,6 +59,19 @@ public interface Dictionary {
   String getDictionaryValueForKey(int surrogateKey);
 
   /**
+   * This method will find and return the dictionary value for a given surrogate key in bytes.
+   * It is as same as getDictionaryValueForKey but it does not convert bytes to String,
+   * it returns bytes directly. User can convert to String by using new String(bytes).
+   * Applicable scenarios:
+   * 1. Query final result preparation : While convert the final result which will
+   * be surrogate key back to original dictionary values this method will be used
+   *
+   * @param surrogateKey a unique ID for a dictionary value
+   * @return value if found else null
+   */
+  byte[] getDictionaryValueForKeyInBytes(int surrogateKey);
+
+  /**
    * This method will find and return the sort index for a given dictionary id.
    * Applicable scenarios:
    * 1. Used in case of order by queries when data sorting is required

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
index 92fe522..abc95e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
@@ -82,6 +82,19 @@ public class ForwardDictionary implements Dictionary {
   }
 
   /**
+   * This method will find and return the dictionary value for a given surrogate key in bytes.
+   * Applicable scenarios:
+   * 1. Query final result preparation : While convert the final result which will
+   * be surrogate key back to original dictionary values this method will be used
+   *
+   * @param surrogateKey a unique ID for a dictionary value
+   * @return value if found else null
+   */
+  @Override public byte[] getDictionaryValueForKeyInBytes(int surrogateKey) {
+    return columnDictionaryInfo.getDictionaryValueForKeyInBytes(surrogateKey);
+  }
+
+  /**
    * This method will find and return the sort index for a given dictionary id.
    * Applicable scenarios:
    * 1. Used in case of order by queries when data sorting is required

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
index 97736ba..ff0e687 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
@@ -76,6 +76,19 @@ public class ReverseDictionary implements Dictionary {
   }
 
   /**
+   * This method will find and return the dictionary value for a given surrogate key in bytes.
+   * Applicable scenarios:
+   * 1. Query final result preparation : While convert the final result which will
+   * be surrogate key back to original dictionary values this method will be used
+   *
+   * @param surrogateKey a unique ID for a dictionary value
+   * @return value if found else null
+   */
+  @Override public byte[] getDictionaryValueForKeyInBytes(int surrogateKey) {
+    return columnReverseDictionaryInfo.getDictionaryValueForKeyInBytes(surrogateKey);
+  }
+
+  /**
    * This method will find and return the sort index for a given dictionary id.
    * Applicable scenarios:
    * 1. Used in case of order by queries when data sorting is required

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index d4347f1..1142c4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.constants;
 
+import java.nio.charset.Charset;
+
 public final class CarbonCommonConstants {
   /**
    * integer size in bytes
@@ -586,10 +588,11 @@ public final class CarbonCommonConstants {
    */
   public static final String TABLEUPDATESTATUS_FILENAME = "tableupdatestatus";
   /**
-   * INMEMORY_REOCRD_SIZE
+   * The batch size of records which returns to client.
    */
   public static final String DETAIL_QUERY_BATCH_SIZE = "carbon.detail.batch.size";
-  public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 10000;
+
+  public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;
   /**
    * SPILL_OVER_DISK_PATH
    */
@@ -711,6 +714,11 @@ public final class CarbonCommonConstants {
   public static final String DEFAULT_CHARSET = "UTF-8";
 
   /**
+   * default charset class to be used for reading and writing
+   */
+  public static final Charset DEFAULT_CHARSET_CLASS = Charset.forName(DEFAULT_CHARSET);
+
+  /**
    * surrogate key that will be sent whenever in the dictionary chunks
    * a valid surrogate key is not found for a given dictionary value
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index 6dcc2b8..456710a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -19,8 +19,8 @@ package org.apache.carbondata.core.datastore;
 import java.io.IOException;
 
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 
 /**
  * Interface data block reference
@@ -77,7 +77,7 @@ public interface DataRefNode {
    *                     data in On IO
    * @return dimension data chunks
    */
-  DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+  DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
       throws IOException;
 
   /**
@@ -86,7 +86,7 @@ public interface DataRefNode {
    * @param fileReader file reader to read the chunk from file
    * @return dimension data chunk
    */
-  DimensionColumnDataChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
+  DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
       throws IOException;
 
   /**
@@ -101,7 +101,7 @@ public interface DataRefNode {
    *                     data in On IO
    * @return measure column data chunk
    */
-  MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+  MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
       throws IOException;
 
   /**
@@ -111,7 +111,7 @@ public interface DataRefNode {
    * @param blockIndex block index to be read from file
    * @return measure data chunk
    */
-  MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException;
+  MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException;
 
   /**
    * @param deleteDeltaDataCache

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
index 12e525e..b1eb1ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
@@ -18,8 +18,22 @@
 package org.apache.carbondata.core.datastore;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public interface FileHolder {
+
+  /**
+   * This method will be used to reads the data to byteBuffer from file based on offset
+   * and length(number of bytes) need to read
+   *
+   * @param filePath fully qualified file path
+   * @param byteBuffer
+   * @param offset reading start position,
+   * @param length number of bytes to be read
+   * @throws IOException
+   */
+  void readByteBuffer(String filePath, ByteBuffer byteBuffer, long offset, int length)
+      throws IOException;
   /**
    * This method will be used to read the byte array from file based on offset
    * and length(number of bytes) need to read

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
new file mode 100644
index 0000000..d04077c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * It contains group of uncompressed blocklets on one column.
+ */
+public abstract class AbstractRawColumnChunk {
+
+  private byte[][] minValues;
+
+  private byte[][] maxValues;
+
+  protected ByteBuffer rawData;
+
+  private int[] lengths;
+
+  private int[] offsets;
+
+  private int[] rowCount;
+
+  protected int pagesCount;
+
+  protected int blockletId;
+
+  protected int offSet;
+
+  protected int length;
+
+  public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
+    this.blockletId = blockletId;
+    this.rawData = rawData;
+    this.offSet = offSet;
+    this.length = length;
+  }
+
+  public byte[][] getMinValues() {
+    return minValues;
+  }
+
+  public void setMinValues(byte[][] minValues) {
+    this.minValues = minValues;
+  }
+
+  public byte[][] getMaxValues() {
+    return maxValues;
+  }
+
+  public void setMaxValues(byte[][] maxValues) {
+    this.maxValues = maxValues;
+  }
+
+  public ByteBuffer getRawData() {
+    return rawData;
+  }
+
+  public void setRawData(ByteBuffer rawData) {
+    this.rawData = rawData;
+  }
+
+  public int[] getLengths() {
+    return lengths;
+  }
+
+  public void setLengths(int[] lengths) {
+    this.lengths = lengths;
+  }
+
+  public int[] getOffsets() {
+    return offsets;
+  }
+
+  public void setOffsets(int[] offsets) {
+    this.offsets = offsets;
+  }
+
+  public int getPagesCount() {
+    return pagesCount;
+  }
+
+  public void setPagesCount(int pagesCount) {
+    this.pagesCount = pagesCount;
+  }
+
+  public int[] getRowCount() {
+    return rowCount;
+  }
+
+  public void setRowCount(int[] rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public abstract void freeMemory();
+
+  public int getBlockletId() {
+    return blockletId;
+  }
+
+  public int getOffSet() {
+    return offSet;
+  }
+
+  public int getLength() {
+    return length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
new file mode 100644
index 0000000..048a703
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+
+/**
+ * Contains raw dimension data,
+ * 1. The read uncompressed raw data of column chunk with all pages is stored in this instance.
+ * 2. The raw data can be converted to processed chunk using convertToDimColDataChunk method
+ *  by specifying page number.
+ */
+public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
+
+  private DimensionColumnDataChunk[] dataChunks;
+
+  private DimensionColumnChunkReader chunkReader;
+
+  private FileHolder fileHolder;
+
+  public DimensionRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length,
+      DimensionColumnChunkReader columnChunkReader) {
+    super(blockletId, rawData, offSet, length);
+    this.chunkReader = columnChunkReader;
+  }
+
+  /**
+   * Convert all raw data with all pages to processed DimensionColumnDataChunk's
+   * @return
+   */
+  public DimensionColumnDataChunk[] convertToDimColDataChunks() {
+    if (dataChunks == null) {
+      dataChunks = new DimensionColumnDataChunk[pagesCount];
+    }
+    for (int i = 0; i < pagesCount; i++) {
+      try {
+        if (dataChunks[i] == null) {
+          dataChunks[i] = chunkReader.convertToDimensionChunk(this, i);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Convert raw data with specified page number processed to DimensionColumnDataChunk
+   * @param index
+   * @return
+   */
+  public DimensionColumnDataChunk convertToDimColDataChunk(int index) {
+    assert index < pagesCount;
+    if (dataChunks == null) {
+      dataChunks = new DimensionColumnDataChunk[pagesCount];
+    }
+    if (dataChunks[index] == null) {
+      try {
+        dataChunks[index] = chunkReader.convertToDimensionChunk(this, index);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return dataChunks[index];
+  }
+
+  @Override public void freeMemory() {
+    if (null != dataChunks) {
+      for (int i = 0; i < dataChunks.length; i++) {
+        if (dataChunks[i] != null) {
+          dataChunks[i].freeMemory();
+        }
+      }
+    }
+  }
+
+  public void setFileHolder(FileHolder fileHolder) {
+    this.fileHolder = fileHolder;
+  }
+
+  public FileHolder getFileReader() {
+    return fileHolder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
new file mode 100644
index 0000000..4702abd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+
+/**
+ * Contains raw measure data
+ * 1. The read uncompressed raw data of column chunk with all pages is stored in this instance.
+ * 2. The raw data can be converted to processed chunk using convertToMeasureColDataChunk method
+ *  by specifying page number.
+ */
+public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
+
+  private MeasureColumnDataChunk[] dataChunks;
+
+  private MeasureColumnChunkReader chunkReader;
+
+  private FileHolder fileReader;
+
+  public MeasureRawColumnChunk(int blockId, ByteBuffer rawData, int offSet, int length,
+      MeasureColumnChunkReader chunkReader) {
+    super(blockId, rawData, offSet, length);
+    this.chunkReader = chunkReader;
+  }
+
+  /**
+   * Convert all raw data with all pages to processed MeasureColumnDataChunk's
+   * @return
+   */
+  public MeasureColumnDataChunk[] convertToMeasureColDataChunks() {
+    if (dataChunks == null) {
+      dataChunks = new MeasureColumnDataChunk[pagesCount];
+    }
+    for (int i = 0; i < pagesCount; i++) {
+      try {
+        if (dataChunks[i] == null) {
+          dataChunks[i] = chunkReader.convertToMeasureChunk(this, i);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return dataChunks;
+  }
+
+  /**
+   * Convert raw data with specified page number processed to MeasureColumnDataChunk
+   * @param index
+   * @return
+   */
+  public MeasureColumnDataChunk convertToMeasureColDataChunk(int index) {
+    assert index < pagesCount;
+    if (dataChunks == null) {
+      dataChunks = new MeasureColumnDataChunk[pagesCount];
+    }
+
+    try {
+      if (dataChunks[index] == null) {
+        dataChunks[index] = chunkReader.convertToMeasureChunk(this, index);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return dataChunks[index];
+  }
+
+  @Override public void freeMemory() {
+    if (null != dataChunks) {
+      for (int i = 0; i < dataChunks.length; i++) {
+        if (dataChunks[i] != null) {
+          dataChunks[i].freeMemory();
+        }
+      }
+    }
+  }
+
+  public void setFileReader(FileHolder fileReader) {
+    this.fileReader = fileReader;
+  }
+
+  public FileHolder getFileReader() {
+    return fileReader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
index a542b25..7110bfa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 
 /**
  * Interface for reading the data chunk
@@ -32,19 +33,30 @@ public interface DimensionColumnChunkReader {
    * Below method will be used to read the chunk based on block indexes
    *
    * @param fileReader   file reader to read the blocks from file
-   * @param blockIndexes blocks to be read
+   * @param blockletIndexes blocklets to be read
    * @return dimension column chunks
    */
-  DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+  DimensionRawColumnChunk[] readRawDimensionChunks(FileHolder fileReader, int[][] blockletIndexes)
       throws IOException;
 
   /**
    * Below method will be used to read the chunk based on block index
    *
    * @param fileReader file reader to read the blocks from file
-   * @param blockIndex block to be read
+   * @param blockletIndex block to be read
    * @return dimension column chunk
    */
-  DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader, int blockIndex)
+  DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex)
       throws IOException;
+
+  /**
+   * Converts the raw data chunk to processed chunk based on blocklet indexes and page numbers
+   *
+   * @param dimensionRawColumnChunk raw data chunk
+   * @param pageNumber page number to be processed
+   * @return
+   * @throws IOException
+   */
+  DimensionColumnDataChunk convertToDimensionChunk(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index a3dbcc0..ef7875b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 
 /**
  * Reader interface for reading the measure blocks from file
@@ -33,7 +34,7 @@ public interface MeasureColumnChunkReader {
    * @param blockIndexes blocks to be read
    * @return measure data chunks
    */
-  MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+  MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
       throws IOException;
 
   /**
@@ -43,6 +44,21 @@ public interface MeasureColumnChunkReader {
    * @param blockIndex block to be read
    * @return measure data chunk
    */
-  MeasureColumnDataChunk readMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException;
+  MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException;
+
+  /**
+   * Covert raw data to measure chunk
+   *
+   * @param fileReader
+   * @param blockIndex
+   * @param rawData
+   * @param offset
+   * @param length
+   * @return
+   * @throws IOException
+   */
+  MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
index 1130e5c..00e6351 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -17,11 +17,13 @@
 package org.apache.carbondata.core.datastore.chunk.reader.dimension.v1;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
@@ -56,70 +58,99 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
   }
 
   /**
-   * Below method will be used to read the chunk based on block indexes
+   * Below method will be used to read the raw chunk based on block indexes
    *
    * @param fileReader   file reader to read the blocks from file
-   * @param blockIndexes blocks to be read
+   * @param blockletIndexes blocks to be read
    * @return dimension column chunks
    */
-  @Override public DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader,
-      int[][] blockIndexes) throws IOException {
-    // read the column chunk based on block index and add
-    DimensionColumnDataChunk[] dataChunks =
-        new DimensionColumnDataChunk[dimensionColumnChunk.size()];
-    for (int i = 0; i < blockIndexes.length; i++) {
-      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
-        dataChunks[j] = readDimensionChunk(fileReader, j);
+  @Override public DimensionRawColumnChunk[] readRawDimensionChunks(FileHolder fileReader,
+      int[][] blockletIndexes) throws IOException {
+    DimensionRawColumnChunk[] dataChunks = new DimensionRawColumnChunk[dimensionColumnChunk.size()];
+    for (int i = 0; i < blockletIndexes.length; i++) {
+      for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
+        dataChunks[j] = readRawDimensionChunk(fileReader, j);
       }
     }
     return dataChunks;
   }
 
   /**
-   * Below method will be used to read the chunk based on block index
+   * Below method will be used to read the raw chunk based on block index
    *
    * @param fileReader file reader to read the blocks from file
-   * @param blockIndex block to be read
+   * @param blockletIndex block to be read
    * @return dimension column chunk
    */
-  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader,
-      int blockIndex) throws IOException {
+  @Override public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
+      int blockletIndex) throws IOException {
+    DataChunk dataChunk = dimensionColumnChunk.get(blockletIndex);
+    ByteBuffer buffer =
+        ByteBuffer.allocateDirect(dataChunk.getDataPageLength());
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer,
+          dataChunk.getDataPageOffset(),
+          dataChunk.getDataPageLength());
+    }
+    DimensionRawColumnChunk rawColumnChunk = new DimensionRawColumnChunk(blockletIndex, buffer, 0,
+        dataChunk.getDataPageLength(), this);
+    rawColumnChunk.setFileHolder(fileReader);
+    rawColumnChunk.setPagesCount(1);
+    rawColumnChunk.setRowCount(new int[] { numberOfRows });
+    return rawColumnChunk;
+  }
+
+  @Override public DimensionColumnDataChunk convertToDimensionChunk(
+      DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
+    int blockIndex = dimensionRawColumnChunk.getBlockletId();
     byte[] dataPage = null;
     int[] invertedIndexes = null;
     int[] invertedIndexesReverse = null;
     int[] rlePage = null;
+    FileHolder fileReader = dimensionRawColumnChunk.getFileReader();
+
+    ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
+    rawData.position(dimensionRawColumnChunk.getOffSet());
+    byte[] data = new byte[dimensionRawColumnChunk.getLength()];
+    rawData.get(data);
+    dataPage = COMPRESSOR.unCompressByte(data);
 
-    // first read the data and uncompressed it
-    dataPage = COMPRESSOR.unCompressByte(fileReader
-        .readByteArray(filePath, dimensionColumnChunk.get(blockIndex).getDataPageOffset(),
-            dimensionColumnChunk.get(blockIndex).getDataPageLength()));
     // if row id block is present then read the row id chunk and uncompress it
-    if (CarbonUtil.hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(),
+    DataChunk dataChunk = dimensionColumnChunk.get(blockIndex);
+    if (CarbonUtil.hasEncoding(dataChunk.getEncodingList(),
         Encoding.INVERTED_INDEX)) {
+      byte[] columnIndexData;
+      synchronized (fileReader) {
+        columnIndexData = fileReader
+            .readByteArray(filePath, dataChunk.getRowIdPageOffset(),
+                dataChunk.getRowIdPageLength());
+      }
       invertedIndexes = CarbonUtil
-          .getUnCompressColumnIndex(dimensionColumnChunk.get(blockIndex).getRowIdPageLength(),
-              fileReader.readByteArray(filePath,
-                  dimensionColumnChunk.get(blockIndex).getRowIdPageOffset(),
-                  dimensionColumnChunk.get(blockIndex).getRowIdPageLength()), numberComressor, 0);
+          .getUnCompressColumnIndex(dataChunk.getRowIdPageLength(),
+              columnIndexData, numberComressor, 0);
       // get the reverse index
       invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
     }
     // if rle is applied then read the rle block chunk and then uncompress
     //then actual data based on rle block
     if (CarbonUtil
-        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), Encoding.RLE)) {
+        .hasEncoding(dataChunk.getEncodingList(), Encoding.RLE)) {
       // read and uncompress the rle block
-      rlePage = numberComressor.unCompress(fileReader
-              .readByteArray(filePath, dimensionColumnChunk.get(blockIndex).getRlePageOffset(),
-                  dimensionColumnChunk.get(blockIndex).getRlePageLength()), 0,
-          dimensionColumnChunk.get(blockIndex).getRlePageLength());
+      byte[] key;
+      synchronized (fileReader) {
+        key = fileReader
+            .readByteArray(filePath, dataChunk.getRlePageOffset(),
+                dataChunk.getRlePageLength());
+      }
+      rlePage = numberComressor
+          .unCompress(key, 0, dataChunk.getRlePageLength());
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
       rlePage = null;
     }
     // fill chunk attributes
     DimensionColumnDataChunk columnDataChunk = null;
-    if (dimensionColumnChunk.get(blockIndex).isRowMajor()) {
+    if (dataChunk.isRowMajor()) {
       // to store fixed length column chunk values
       columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, eachColumnValueSize[blockIndex],
           numberOfRows);
@@ -127,7 +158,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
     // if no dictionary column then first create a no dictionary column chunk
     // and set to data chunk instance
     else if (!CarbonUtil
-        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), Encoding.DICTIONARY)) {
+        .hasEncoding(dataChunk.getEncodingList(), Encoding.DICTIONARY)) {
       columnDataChunk =
           new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
               numberOfRows);
@@ -139,5 +170,4 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
     }
     return columnDataChunk;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 3257ed4..9d5849f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -17,11 +17,13 @@
 package org.apache.carbondata.core.datastore.chunk.reader.dimension.v2;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
@@ -44,7 +46,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
   /**
    * dimension chunks length
    */
-  private List<Short> dimensionChunksLength;
+  private List<Integer> dimensionChunksLength;
 
   /**
    * Constructor to get minimum parameter to create instance of this class
@@ -70,41 +72,43 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
    * For last column read is separately and process
    *
    * @param fileReader   file reader to read the blocks from file
-   * @param blockIndexes blocks range to be read
+   * @param blockletIndexes blocks range to be read
    * @return dimension column chunks
    */
-  @Override public DimensionColumnDataChunk[] readDimensionChunks(final FileHolder fileReader,
-      final int[][] blockIndexes) throws IOException {
+  @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
+      final int[][] blockletIndexes) throws IOException {
     // read the column chunk based on block index and add
-    DimensionColumnDataChunk[] dataChunks =
-        new DimensionColumnDataChunk[dimensionChunksOffset.size()];
+    DimensionRawColumnChunk[] dataChunks =
+        new DimensionRawColumnChunk[dimensionChunksOffset.size()];
     // if blocklet index is empty then return empry data chunk
-    if (blockIndexes.length == 0) {
+    if (blockletIndexes.length == 0) {
       return dataChunks;
     }
-    DimensionColumnDataChunk[] groupChunk = null;
+    DimensionRawColumnChunk[] groupChunk = null;
     int index = 0;
     // iterate till block indexes -1 as block index will be in sorted order, so to avoid
     // the last column reading in group
-    for (int i = 0; i < blockIndexes.length - 1; i++) {
+    for (int i = 0; i < blockletIndexes.length - 1; i++) {
       index = 0;
-      groupChunk = readDimensionChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
-      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+      groupChunk =
+          readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
+      for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
         dataChunks[j] = groupChunk[index++];
       }
     }
     // check last index is present in block index, if it is present then read separately
-    if (blockIndexes[blockIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
-      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
-          readDimensionChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+      dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
+          readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
     }
     // otherwise read the data in group
     else {
-      groupChunk = readDimensionChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
-          blockIndexes[blockIndexes.length - 1][1]);
+      groupChunk =
+          readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
+              blockletIndexes[blockletIndexes.length - 1][1]);
       index = 0;
-      for (int j = blockIndexes[blockIndexes.length - 1][0];
-           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+      for (int j = blockletIndexes[blockletIndexes.length - 1][0];
+           j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
         dataChunks[j] = groupChunk[index++];
       }
     }
@@ -115,48 +119,100 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
    * Below method will be used to read the chunk based on block index
    *
    * @param fileReader file reader to read the blocks from file
-   * @param blockIndex block to be read
+   * @param blockletIndex block to be read
    * @return dimension column chunk
    */
-  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader,
-      int blockIndex) throws IOException {
+  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
+      int blockletIndex) throws IOException {
+    int length = 0;
+    if (dimensionChunksOffset.size() - 1 == blockletIndex) {
+      // Incase of last block read only for datachunk and read remaining while converting it.
+      length = dimensionChunksLength.get(blockletIndex);
+    } else {
+      long currentDimensionOffset = dimensionChunksOffset.get(blockletIndex);
+      length = (int) (dimensionChunksOffset.get(blockletIndex + 1) - currentDimensionOffset);
+    }
+    ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, dimensionChunksOffset.get(blockletIndex), length);
+    }
+    DimensionRawColumnChunk rawColumnChunk =
+        new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
+    rawColumnChunk.setFileHolder(fileReader);
+    rawColumnChunk.setPagesCount(1);
+    rawColumnChunk.setRowCount(new int[]{numberOfRows});
+    return rawColumnChunk;
+  }
+
+  private DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startBlockIndex, int endBlockIndex) throws IOException {
+    long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(
+        (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
+          (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+    }
+    DimensionRawColumnChunk[] dataChunks =
+        new DimensionRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+    int index = 0;
+    int runningLength = 0;
+    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+      int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
+      dataChunks[index] =
+          new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
+      dataChunks[index].setFileHolder(fileReader);
+      dataChunks[index].setPagesCount(1);
+      dataChunks[index].setRowCount(new int[] { numberOfRows });
+      runningLength += currentLength;
+      index++;
+    }
+    return dataChunks;
+  }
+
+  public DimensionColumnDataChunk convertToDimensionChunk(
+      DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
     byte[] dataPage = null;
     int[] invertedIndexes = null;
     int[] invertedIndexesReverse = null;
     int[] rlePage = null;
     DataChunk2 dimensionColumnChunk = null;
-    byte[] data = null;
-    int copySourcePoint = 0;
-    byte[] dimensionChunk = null;
+    int copySourcePoint = dimensionRawColumnChunk.getOffSet();
+    int blockIndex = dimensionRawColumnChunk.getBlockletId();
+    ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
     if (dimensionChunksOffset.size() - 1 == blockIndex) {
-      dimensionChunk = fileReader.readByteArray(filePath, dimensionChunksOffset.get(blockIndex),
-          dimensionChunksLength.get(blockIndex));
       dimensionColumnChunk = CarbonUtil
-          .readDataChunk(dimensionChunk, copySourcePoint, dimensionChunksLength.get(blockIndex));
+          .readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
       int totalDimensionDataLength =
           dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
               + dimensionColumnChunk.rowid_page_length;
-      data = fileReader.readByteArray(filePath,
-          dimensionChunksOffset.get(blockIndex) + dimensionChunksLength.get(blockIndex),
-          totalDimensionDataLength);
+      synchronized (dimensionRawColumnChunk.getFileReader()) {
+        rawData = ByteBuffer.allocateDirect(totalDimensionDataLength);
+        dimensionRawColumnChunk.getFileReader().readByteBuffer(filePath, rawData,
+            dimensionChunksOffset.get(blockIndex) + dimensionChunksLength.get(blockIndex),
+            totalDimensionDataLength);
+      }
     } else {
-      long currentDimensionOffset = dimensionChunksOffset.get(blockIndex);
-      data = fileReader.readByteArray(filePath, currentDimensionOffset,
-          (int) (dimensionChunksOffset.get(blockIndex + 1) - currentDimensionOffset));
       dimensionColumnChunk =
-          CarbonUtil.readDataChunk(data, copySourcePoint, dimensionChunksLength.get(blockIndex));
+          CarbonUtil.readDataChunk(rawData, copySourcePoint, dimensionChunksLength.get(blockIndex));
       copySourcePoint += dimensionChunksLength.get(blockIndex);
     }
 
+    byte[] data = new byte[dimensionColumnChunk.data_page_length];
+    rawData.position(copySourcePoint);
+    rawData.get(data);
     // first read the data and uncompressed it
     dataPage =
-        COMPRESSOR.unCompressByte(data, copySourcePoint, dimensionColumnChunk.data_page_length);
+        COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
     copySourcePoint += dimensionColumnChunk.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it
     if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+      byte[] dataInv = new byte[dimensionColumnChunk.rowid_page_length];
+      rawData.position(copySourcePoint);
+      rawData.get(dataInv);
       invertedIndexes = CarbonUtil
-          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, data, numberComressor,
-              copySourcePoint);
+          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, dataInv,
+              numberComressor, 0);
       copySourcePoint += dimensionColumnChunk.rowid_page_length;
       // get the reverse index
       invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
@@ -164,11 +220,13 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     // if rle is applied then read the rle block chunk and then uncompress
     //then actual data based on rle block
     if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+      byte[] dataRle = new byte[dimensionColumnChunk.rle_page_length];
+      rawData.position(copySourcePoint);
+      rawData.get(dataRle);
       rlePage =
-          numberComressor.unCompress(data, copySourcePoint, dimensionColumnChunk.rle_page_length);
+          numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
-      rlePage = null;
     }
     // fill chunk attributes
     DimensionColumnDataChunk columnDataChunk = null;
@@ -194,85 +252,6 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
   }
 
   /**
-   * Below method will be used to read the dimension chunks in group.
-   * This is to enhance the IO performance. Will read the data from start index
-   * to end index(including)
-   *
-   * @param fileReader      stream used for reading
-   * @param startBlockIndex start block index
-   * @param endBlockIndex   end block index
-   * @return dimension column chunk array
-   */
-  private DimensionColumnDataChunk[] readDimensionChunksInGroup(FileHolder fileReader,
-      int startBlockIndex, int endBlockIndex) throws IOException {
-    long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
-    byte[] data = fileReader.readByteArray(filePath, currentDimensionOffset,
-        (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
-    int copySourcePoint = 0;
-    // read the column chunk based on block index and add
-    DimensionColumnDataChunk[] dataChunks =
-        new DimensionColumnDataChunk[endBlockIndex - startBlockIndex + 1];
-    byte[] dataPage = null;
-    int[] invertedIndexes = null;
-    int[] invertedIndexesReverse = null;
-    int[] rlePage = null;
-    DataChunk2 dimensionColumnChunk = null;
-    int index = 0;
-    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
-      invertedIndexes = null;
-      invertedIndexesReverse = null;
-      dimensionColumnChunk =
-          CarbonUtil.readDataChunk(data, copySourcePoint, dimensionChunksLength.get(i));
-      copySourcePoint += dimensionChunksLength.get(i);
-      // first read the data and uncompressed it
-      dataPage =
-          COMPRESSOR.unCompressByte(data, copySourcePoint, dimensionColumnChunk.data_page_length);
-      copySourcePoint += dimensionColumnChunk.data_page_length;
-      // if row id block is present then read the row id chunk and uncompress it
-      if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
-        invertedIndexes = CarbonUtil
-            .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, data, numberComressor,
-                copySourcePoint);
-        copySourcePoint += dimensionColumnChunk.rowid_page_length;
-        // get the reverse index
-        invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
-      }
-      // if rle is applied then read the rle block chunk and then uncompress
-      //then actual data based on rle block
-      if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
-        // read and uncompress the rle block
-        rlePage =
-            numberComressor.unCompress(data, copySourcePoint, dimensionColumnChunk.rle_page_length);
-        copySourcePoint += dimensionColumnChunk.rle_page_length;
-        // uncompress the data with rle indexes
-        dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[i]);
-        rlePage = null;
-      }
-      // fill chunk attributes
-      DimensionColumnDataChunk columnDataChunk = null;
-      if (dimensionColumnChunk.isRowMajor()) {
-        // to store fixed length column chunk values
-        columnDataChunk =
-            new ColumnGroupDimensionDataChunk(dataPage, eachColumnValueSize[i], numberOfRows);
-      }
-      // if no dictionary column then first create a no dictionary column chunk
-      // and set to data chunk instance
-      else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
-        columnDataChunk =
-            new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
-                numberOfRows);
-      } else {
-        // to store fixed length column chunk values
-        columnDataChunk =
-            new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
-                numberOfRows, eachColumnValueSize[i]);
-      }
-      dataChunks[index++] = columnDataChunk;
-    }
-    return dataChunks;
-  }
-
-  /**
    * Below method will be used to check whether particular encoding is present
    * in the dimension or not
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 750da37..107c430 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -17,10 +17,12 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure.v1;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
 import org.apache.carbondata.core.datastore.compression.ReaderCompressModel;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
@@ -59,12 +61,12 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
    * @param blockIndexes blocks to be read
    * @return measure data chunks
    */
-  @Override public MeasureColumnDataChunk[] readMeasureChunks(final FileHolder fileReader,
-      final int[][] blockIndexes) throws IOException {
-    MeasureColumnDataChunk[] datChunk = new MeasureColumnDataChunk[measureColumnChunks.size()];
+  @Override public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader,
+      int[][] blockIndexes) throws IOException {
+    MeasureRawColumnChunk[] datChunk = new MeasureRawColumnChunk[measureColumnChunks.size()];
     for (int i = 0; i < blockIndexes.length; i++) {
       for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
-        datChunk[j] = readMeasureChunk(fileReader, j);
+        datChunk[j] = readRawMeasureChunk(fileReader, j);
       }
     }
     return datChunk;
@@ -77,20 +79,40 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
    * @param blockIndex block to be read
    * @return measure data chunk
    */
-  @Override public MeasureColumnDataChunk readMeasureChunk(final FileHolder fileReader,
-      final int blockIndex) throws IOException {
-    ValueEncoderMeta meta = measureColumnChunks.get(blockIndex).getValueEncoderMeta().get(0);
+  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException {
+    DataChunk dataChunk = measureColumnChunks.get(blockIndex);
+    ByteBuffer buffer =
+        ByteBuffer.allocateDirect(dataChunk.getDataPageLength());
+    fileReader
+        .readByteBuffer(filePath, buffer, dataChunk.getDataPageOffset(),
+            dataChunk.getDataPageLength());
+    MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0,
+        dataChunk.getDataPageLength(), this);
+    rawColumnChunk.setFileReader(fileReader);
+    rawColumnChunk.setPagesCount(1);
+    rawColumnChunk.setRowCount(new int[] { numberOfRows });
+    return rawColumnChunk;
+  }
+
+  @Override
+  public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber) throws IOException {
+    int blockIndex = measureRawColumnChunk.getBlockletId();
+    DataChunk dataChunk = measureColumnChunks.get(blockIndex);
+    ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
     ReaderCompressModel compressModel = ValueCompressionUtil.getReaderCompressModel(meta);
 
     ValueCompressionHolder values = compressModel.getValueCompressionHolder();
-    byte[] dataPage = fileReader
-            .readByteArray(filePath, measureColumnChunks.get(blockIndex).getDataPageOffset(),
-                    measureColumnChunks.get(blockIndex).getDataPageLength());
+    byte[] dataPage = new byte[measureRawColumnChunk.getLength()];
+    ByteBuffer rawData = measureRawColumnChunk.getRawData();
+    rawData.position(measureRawColumnChunk.getOffSet());
+    rawData.get(dataPage);
 
     // unCompress data
     values.uncompress(compressModel.getConvertedDataType(), dataPage, 0,
-            measureColumnChunks.get(blockIndex).getDataPageLength(), compressModel.getMantissa(),
-            compressModel.getMaxValue(), numberOfRows);
+        dataChunk.getDataPageLength(), compressModel.getMantissa(),
+        compressModel.getMaxValue(), numberOfRows);
 
     CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
 
@@ -99,8 +121,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
     datChunk.setMeasureDataHolder(measureDataHolder);
     // set the enun value indexes
     datChunk
-        .setNullValueIndexHolder(measureColumnChunks.get(blockIndex).getNullValueIndexForColumn());
+        .setNullValueIndexHolder(dataChunk.getNullValueIndexForColumn());
     return datChunk;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index d92da61..7ac1578 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -17,12 +17,14 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure.v2;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
@@ -47,7 +49,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
   /**
    * measure column chunks length
    */
-  private List<Short> measureColumnChunkLength;
+  private List<Integer> measureColumnChunkLength;
 
   /**
    * Constructor to get minimum parameter to create instance of this class
@@ -90,28 +92,28 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
    * @return measure column chunks
    * @throws IOException
    */
-  public MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+  public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
       throws IOException {
     // read the column chunk based on block index and add
-    MeasureColumnDataChunk[] dataChunks =
-        new MeasureColumnDataChunk[measureColumnChunkOffsets.size()];
+    MeasureRawColumnChunk[] dataChunks =
+        new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
     if (blockIndexes.length == 0) {
       return dataChunks;
     }
-    MeasureColumnDataChunk[] groupChunk = null;
+    MeasureRawColumnChunk[] groupChunk = null;
     int index = 0;
     for (int i = 0; i < blockIndexes.length - 1; i++) {
       index = 0;
-      groupChunk = readMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
       for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
         dataChunks[j] = groupChunk[index++];
       }
     }
     if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
       dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
-          readMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+          readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
     } else {
-      groupChunk = readMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
           blockIndexes[blockIndexes.length - 1][1]);
       index = 0;
       for (int j = blockIndexes[blockIndexes.length - 1][0];
@@ -122,36 +124,75 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     return dataChunks;
   }
 
-  /**
-   * Method to read the blocks data based on block index
-   *
-   * @param fileReader file reader to read the blocks
-   * @param blockIndex block to be read
-   * @return measure data chunk
-   * @throws IOException
-   */
-  @Override public MeasureColumnDataChunk readMeasureChunk(FileHolder fileReader, int blockIndex)
+  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
       throws IOException {
+    int dataLength = 0;
+    if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+      dataLength = measureColumnChunkLength.get(blockIndex);
+    } else {
+      long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
+      dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset);
+    }
+    ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
+    synchronized (fileReader) {
+      fileReader
+          .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
+    }
+    MeasureRawColumnChunk rawColumnChunk =
+        new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
+    rawColumnChunk.setFileReader(fileReader);
+    rawColumnChunk.setPagesCount(1);
+    rawColumnChunk.setRowCount(new int[] { numberOfRows });
+    return rawColumnChunk;
+  }
+
+  private MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startBlockIndex, int endBlockIndex) throws IOException {
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(
+        (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
+          (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+    }
+    MeasureRawColumnChunk[] dataChunks =
+        new MeasureRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+    int runningLength = 0;
+    int index = 0;
+    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+      int currentLength =
+          (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
+      MeasureRawColumnChunk measureRawColumnChunk =
+          new MeasureRawColumnChunk(i, buffer, runningLength, currentLength, this);
+      measureRawColumnChunk.setFileReader(fileReader);
+      measureRawColumnChunk.setRowCount(new int[] { numberOfRows });
+      measureRawColumnChunk.setPagesCount(1);
+      dataChunks[index] = measureRawColumnChunk;
+      runningLength += currentLength;
+      index++;
+    }
+    return dataChunks;
+  }
+
+  public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber) throws IOException {
     MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
     DataChunk2 measureColumnChunk = null;
-    byte[] measureDataChunk = null;
-    byte[] data = null;
-    int copyPoint = 0;
+    int copyPoint = measureRawColumnChunk.getOffSet();
+    int blockIndex = measureRawColumnChunk.getBlockletId();
+    ByteBuffer rawData = measureRawColumnChunk.getRawData();
     if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
-      measureDataChunk = fileReader
-          .readByteArray(filePath, measureColumnChunkOffsets.get(blockIndex),
-              measureColumnChunkLength.get(blockIndex));
-      measureColumnChunk = CarbonUtil
-          .readDataChunk(measureDataChunk, copyPoint, measureColumnChunkLength.get(blockIndex));
-      data = fileReader.readByteArray(filePath,
-          measureColumnChunkOffsets.get(blockIndex) + measureColumnChunkLength.get(blockIndex),
-          measureColumnChunk.data_page_length);
+      measureColumnChunk =
+          CarbonUtil.readDataChunk(rawData, copyPoint, measureColumnChunkLength.get(blockIndex));
+      synchronized (measureRawColumnChunk.getFileReader()) {
+        rawData = ByteBuffer.allocateDirect(measureColumnChunk.data_page_length);
+        measureRawColumnChunk.getFileReader().readByteBuffer(filePath, rawData,
+            measureColumnChunkOffsets.get(blockIndex) + measureColumnChunkLength.get(blockIndex),
+            measureColumnChunk.data_page_length);
+      }
     } else {
-      long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
-      data = fileReader.readByteArray(filePath, currentMeasureOffset,
-          (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset));
       measureColumnChunk =
-          CarbonUtil.readDataChunk(data, copyPoint, measureColumnChunkLength.get(blockIndex));
+          CarbonUtil.readDataChunk(rawData, copyPoint, measureColumnChunkLength.get(blockIndex));
       copyPoint += measureColumnChunkLength.get(blockIndex);
     }
     List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
@@ -162,11 +203,13 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
 
     ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
-
+    byte[] data = new byte[measureColumnChunk.data_page_length];
+    rawData.position(copyPoint);
+    rawData.get(data);
     // uncompress
-    values.uncompress(compressionModel.getConvertedDataType()[0], data,
-        copyPoint, measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
-            compressionModel.getMaxValue()[0], numberOfRows);
+    values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
+        measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+        compressionModel.getMaxValue()[0], numberOfRows);
 
     CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
 
@@ -177,58 +220,4 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
     return datChunk;
   }
-
-  /**
-   * Below method will be used to read the dimension chunks in group. This is
-   * to enhance the IO performance. Will read the data from start index to end
-   * index(including)
-   *
-   * @param fileReader      stream used for reading
-   * @param startBlockIndex start block index
-   * @param endBlockIndex   end block index
-   * @return measure column chunk array
-   * @throws IOException
-   */
-  private MeasureColumnDataChunk[] readMeasureChunksInGroup(FileHolder fileReader,
-      int startBlockIndex, int endBlockIndex) throws IOException {
-    long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
-    byte[] data = fileReader.readByteArray(filePath, currentMeasureOffset,
-        (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
-    MeasureColumnDataChunk[] dataChunks =
-        new MeasureColumnDataChunk[endBlockIndex - startBlockIndex + 1];
-    MeasureColumnDataChunk dataChunk = null;
-    int index = 0;
-    int copyPoint = 0;
-    DataChunk2 measureColumnChunk = null;
-    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
-      dataChunk = new MeasureColumnDataChunk();
-      measureColumnChunk =
-          CarbonUtil.readDataChunk(data, copyPoint, measureColumnChunkLength.get(i));
-      copyPoint += measureColumnChunkLength.get(i);
-      List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
-      for (int j = 0; j < measureColumnChunk.getEncoder_meta().size(); j++) {
-        valueEncodeMeta.add(
-            CarbonUtil.deserializeEncoderMeta(measureColumnChunk.getEncoder_meta().get(j).array()));
-      }
-      WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
-
-      ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
-
-      // uncompress
-      values.uncompress(compressionModel.getConvertedDataType()[0], data, copyPoint,
-              measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
-              compressionModel.getMaxValue()[0], numberOfRows);
-
-      CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
-
-      copyPoint += measureColumnChunk.data_page_length;
-      // set the data chunk
-      dataChunk.setMeasureDataHolder(measureDataHolder);
-
-      // set the enun value indexes
-      dataChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
-      dataChunks[index++] = dataChunk;
-    }
-    return dataChunks;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
index ef8fff7..a7f38cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
@@ -27,14 +27,17 @@ public final class UnBlockIndexer {
 
   public static int[] uncompressIndex(int[] indexData, int[] indexMap) {
     int actualSize = indexData.length;
-    for (int i = 0; i < indexMap.length; i++) {
+    int mapLength = indexMap.length;
+    for (int i = 0; i < mapLength; i++) {
       actualSize += indexData[indexMap[i] + 1] - indexData[indexMap[i]] - 1;
     }
     int[] indexes = new int[actualSize];
     int k = 0;
+    int oldIndex = 0;
     for (int i = 0; i < indexData.length; i++) {
-      int index = Arrays.binarySearch(indexMap, i);
+      int index = Arrays.binarySearch(indexMap, oldIndex, mapLength, i);
       if (index > -1) {
+        oldIndex = index;
         for (int j = indexData[indexMap[index]]; j <= indexData[indexMap[index] + 1]; j++) {
           indexes[k] = j;
           k++;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
index 8a97cbf..dcd74c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.datastore.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -129,4 +130,12 @@ public class DFSFileHolderImpl implements FileHolder {
     FSDataInputStream fileChannel = updateCache(filePath);
     return fileChannel.readInt();
   }
+
+  @Override
+  public void readByteBuffer(String filePath, ByteBuffer byteBuffer,
+      long offset, int length) throws IOException {
+    byte[] readByteArray = readByteArray(filePath, offset, length);
+    byteBuffer.put(readByteArray);
+    byteBuffer.rewind();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java
index 1f073a8..d78c28e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java
@@ -192,5 +192,13 @@ public class FileHolderImpl implements FileHolder {
     ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
     return byteBffer.getLong();
   }
+  @Override
+  public void readByteBuffer(String filePath, ByteBuffer byteBuffer,
+      long offset, int length) throws IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    fileChannel.position(offset);
+    fileChannel.read(byteBuffer);
+    byteBuffer.rewind();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
index 2bb4d83..dfd35bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -22,8 +22,8 @@ import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 
 /**
  * Non leaf node abstract class
@@ -177,7 +177,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
    * @param blockIndexes indexes of the blocks need to be read
    * @return dimension data chunks
    */
-  @Override public DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader,
+  @Override public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader,
       int[][] blockIndexes) throws IOException {
     // No required here as leaf which will will be use this class will implement its own get
     // dimension chunks
@@ -191,7 +191,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
    * @param blockIndex block index to be read
    * @return dimension data chunk
    */
-  @Override public DimensionColumnDataChunk getDimensionChunk(FileHolder fileReader,
+  @Override public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader,
       int blockIndex) throws IOException {
     // No required here as leaf which will will be use this class will implement
     // its own get dimension chunks
@@ -205,7 +205,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
    * @param blockIndexes block indexes to be read from file
    * @return measure column data chunk
    */
-  @Override public MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader,
+  @Override public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader,
       int[][] blockIndexes) throws IOException {
     // No required here as leaf which will will be use this class will implement its own get
     // measure chunks
@@ -219,7 +219,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
    * @param blockIndex block index to be read from file
    * @return measure data chunk
    */
-  @Override public MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+  @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
       throws IOException {
     // No required here as leaf which will will be use this class will implement its own get
     // measure chunks

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
index 404aad7..8e5976d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java
@@ -24,8 +24,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 
 /**
  * No leaf node of a b+tree class which will keep the matadata(start key) of the
@@ -170,7 +170,7 @@ public class BTreeNonLeafNode implements BTreeNode {
    * @param blockIndexes indexes of the blocks need to be read
    * @return dimension data chunks
    */
-  @Override public DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader,
+  @Override public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader,
       int[][] blockIndexes) {
 
     // operation of getting the dimension chunks is not supported as its a
@@ -187,7 +187,7 @@ public class BTreeNonLeafNode implements BTreeNode {
    * @param fileReader file reader to read the chunk from file
    * @return dimension data chunk
    */
-  @Override public DimensionColumnDataChunk getDimensionChunk(FileHolder fileReader,
+  @Override public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader,
       int blockIndexes) {
     // operation of getting the dimension chunk is not supported as its a
     // non leaf node
@@ -204,7 +204,7 @@ public class BTreeNonLeafNode implements BTreeNode {
    * @param blockIndexes block indexes to be read from file
    * @return measure column data chunk
    */
-  @Override public MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader,
+  @Override public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader,
       int[][] blockIndexes) {
     // operation of getting the measure chunk is not supported as its a non
     // leaf node
@@ -222,7 +222,7 @@ public class BTreeNonLeafNode implements BTreeNode {
    * @return measure data chunk
    */
 
-  @Override public MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int blockIndex) {
+  @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) {
     // operation of getting the measure chunk is not supported as its a non
     // leaf node
     // and in case of B+Tree data will be stored only in leaf node and


[5/5] incubator-carbondata git commit: [CARBONDATA-726] Handled query and scan for V3 format. This closes #584

Posted by ja...@apache.org.
[CARBONDATA-726] Handled query and scan for V3 format. This closes #584


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3e36cdf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3e36cdf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3e36cdf5

Branch: refs/heads/master
Commit: 3e36cdf54fe7158cda2596203c22184459097c8d
Parents: 766671c 72cb415
Author: jackylk <ja...@huawei.com>
Authored: Fri Feb 24 13:38:21 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Feb 24 13:38:21 2017 +0800

----------------------------------------------------------------------
 .../AbstractColumnDictionaryInfo.java           |  18 +-
 .../core/cache/dictionary/Dictionary.java       |  13 ++
 .../cache/dictionary/ForwardDictionary.java     |  13 ++
 .../cache/dictionary/ReverseDictionary.java     |  13 ++
 .../core/constants/CarbonCommonConstants.java   |  12 +-
 .../carbondata/core/datastore/DataRefNode.java  |  12 +-
 .../carbondata/core/datastore/FileHolder.java   |  14 ++
 .../datastore/chunk/AbstractRawColumnChunk.java | 124 +++++++++++
 .../chunk/impl/DimensionRawColumnChunk.java     | 105 +++++++++
 .../chunk/impl/MeasureRawColumnChunk.java       | 107 +++++++++
 .../reader/DimensionColumnChunkReader.java      |  20 +-
 .../chunk/reader/MeasureColumnChunkReader.java  |  20 +-
 ...mpressedDimensionChunkFileBasedReaderV1.java |  92 +++++---
 ...mpressedDimensionChunkFileBasedReaderV2.java | 215 +++++++++----------
 ...CompressedMeasureChunkFileBasedReaderV1.java |  49 +++--
 ...CompressedMeasureChunkFileBasedReaderV2.java | 169 +++++++--------
 .../core/datastore/columnar/UnBlockIndexer.java |   7 +-
 .../core/datastore/impl/DFSFileHolderImpl.java  |   9 +
 .../core/datastore/impl/FileHolderImpl.java     |   8 +
 .../impl/btree/AbstractBTreeLeafNode.java       |  12 +-
 .../datastore/impl/btree/BTreeNonLeafNode.java  |  12 +-
 .../impl/btree/BlockletBTreeLeafNode.java       |  20 +-
 .../core/metadata/blocklet/BlockletInfo.java    |  32 ++-
 .../DictionaryBasedVectorResultCollector.java   |  54 +++--
 .../core/scan/complextypes/ArrayQueryType.java  |  16 +-
 .../scan/complextypes/ComplexQueryType.java     |  13 +-
 .../scan/complextypes/PrimitiveQueryType.java   |  12 +-
 .../core/scan/complextypes/StructQueryType.java |  10 +-
 .../executor/impl/AbstractQueryExecutor.java    |  19 +-
 .../scan/executor/infos/BlockExecutionInfo.java |  27 +++
 .../core/scan/executor/util/QueryUtil.java      |  13 +-
 .../carbondata/core/scan/filter/FilterUtil.java |   5 -
 .../core/scan/filter/GenericQueryType.java      |   6 +-
 .../filter/executer/AndFilterExecuterImpl.java  |  12 +-
 .../executer/ExcludeFilterExecuterImpl.java     |  32 ++-
 .../scan/filter/executer/FilterExecuter.java    |   9 +-
 .../executer/IncludeFilterExecuterImpl.java     |  54 ++++-
 .../filter/executer/OrFilterExecuterImpl.java   |  11 +-
 .../executer/RestructureFilterExecuterImpl.java |  52 -----
 .../executer/RowLevelFilterExecuterImpl.java    | 192 +++++++++--------
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  69 ++++--
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  65 ++++--
 ...velRangeLessThanEqualFilterExecuterImpl.java |  67 ++++--
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  66 ++++--
 .../RowLevelRangeFilterResolverImpl.java        |  34 ++-
 .../processor/AbstractDataBlockIterator.java    | 117 ++++++++--
 .../core/scan/processor/BlocksChunkHolder.java  |  51 +++--
 .../processor/impl/DataBlockIteratorImpl.java   |   8 +-
 .../core/scan/result/AbstractScannedResult.java | 113 +++++++---
 .../result/impl/FilterQueryScannedResult.java   |  25 ++-
 .../AbstractDetailQueryResultIterator.java      |  21 +-
 .../iterator/DetailQueryResultIterator.java     |  48 +----
 .../scan/scanner/AbstractBlockletScanner.java   | 106 ++++++---
 .../core/scan/scanner/BlockletScanner.java      |  20 ++
 .../core/scan/scanner/impl/FilterScanner.java   | 172 ++++++++++-----
 .../scan/scanner/impl/NonFilterScanner.java     |   3 -
 .../carbondata/core/util/BitSetGroup.java       |  82 +++++++
 .../apache/carbondata/core/util/CarbonUtil.java |  36 ++--
 .../core/util/DataFileFooterConverter2.java     |  13 +-
 .../carbondata/core/util/DataTypeUtil.java      |  82 +++++++
 .../carbondata/core/util/DataTypeUtilTest.java  |   2 +-
 .../scanner/impl/FilterScannerTest.java         |   2 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  17 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   9 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |   1 +
 66 files changed, 2025 insertions(+), 839 deletions(-)
----------------------------------------------------------------------



[3/5] incubator-carbondata git commit: WIP Added code for new V3 format to optimize scan

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
index 66633f3..00a0e01 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
@@ -20,8 +20,8 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
@@ -84,9 +84,9 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
    * @param blockIndexes indexes of the blocks need to be read
    * @return dimension data chunks
    */
-  @Override public DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader,
+  @Override public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader,
       int[][] blockIndexes) throws IOException {
-    return dimensionChunksReader.readDimensionChunks(fileReader, blockIndexes);
+    return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes);
   }
 
   /**
@@ -96,9 +96,9 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
    * @param blockIndex block index to be read
    * @return dimension data chunk
    */
-  @Override public DimensionColumnDataChunk getDimensionChunk(FileHolder fileReader,
+  @Override public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader,
       int blockIndex) throws IOException {
-    return dimensionChunksReader.readDimensionChunk(fileReader, blockIndex);
+    return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndex);
   }
 
   /**
@@ -108,9 +108,9 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
    * @param blockIndexes block indexes to be read from file
    * @return measure column data chunk
    */
-  @Override public MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader,
+  @Override public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader,
       int[][] blockIndexes) throws IOException {
-    return measureColumnChunkReader.readMeasureChunks(fileReader, blockIndexes);
+    return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes);
   }
 
   /**
@@ -120,8 +120,8 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
    * @param blockIndex block index to be read from file
    * @return measure data chunk
    */
-  @Override public MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+  @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
       throws IOException {
-    return measureColumnChunkReader.readMeasureChunk(fileReader, blockIndex);
+    return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index fe64e88..97f3822 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -50,17 +50,21 @@ public class BlockletInfo implements Serializable {
 
   private List<Long> dimensionChunkOffsets;
 
-  private List<Short> dimensionChunksLength;
+  private List<Integer> dimensionChunksLength;
 
   private List<Long> measureChunkOffsets;
 
-  private List<Short> measureChunksLength;
+  private List<Integer> measureChunksLength;
 
   /**
    * to store the index like min max and start and end key of each column of the blocklet
    */
   private BlockletIndex blockletIndex;
 
+  private long dimensionOffset;
+
+  private long measureOffsets;
+
   /**
    * @return the numberOfRows
    */
@@ -125,11 +129,11 @@ public class BlockletInfo implements Serializable {
     this.dimensionChunkOffsets = dimensionChunkOffsets;
   }
 
-  public List<Short> getDimensionChunksLength() {
+  public List<Integer> getDimensionChunksLength() {
     return dimensionChunksLength;
   }
 
-  public void setDimensionChunksLength(List<Short> dimensionChunksLength) {
+  public void setDimensionChunksLength(List<Integer> dimensionChunksLength) {
     this.dimensionChunksLength = dimensionChunksLength;
   }
 
@@ -141,12 +145,28 @@ public class BlockletInfo implements Serializable {
     this.measureChunkOffsets = measureChunkOffsets;
   }
 
-  public List<Short> getMeasureChunksLength() {
+  public List<Integer> getMeasureChunksLength() {
     return measureChunksLength;
   }
 
-  public void setMeasureChunksLength(List<Short> measureChunksLength) {
+  public void setMeasureChunksLength(List<Integer> measureChunksLength) {
     this.measureChunksLength = measureChunksLength;
   }
 
+  public long getDimensionOffset() {
+    return dimensionOffset;
+  }
+
+  public void setDimensionOffset(long dimensionOffset) {
+    this.dimensionOffset = dimensionOffset;
+  }
+
+  public long getMeasureOffsets() {
+    return measureOffsets;
+  }
+
+  public void setMeasureOffsets(long measureOffsets) {
+    this.measureOffsets = measureOffsets;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index f622ddc..9bc40c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -108,27 +108,41 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
 
   @Override public void collectVectorBatch(AbstractScannedResult scannedResult,
       CarbonColumnarBatch columnarBatch) {
-    int rowCounter = scannedResult.getRowCounter();
-    int availableRows = scannedResult.numberOfOutputRows() - rowCounter;
-    int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize();
-    requiredRows = Math.min(requiredRows, availableRows);
-    if (requiredRows < 1) {
-      return;
-    }
-    for (int i = 0; i < allColumnInfo.length; i++) {
-      allColumnInfo[i].size = requiredRows;
-      allColumnInfo[i].offset = rowCounter;
-      allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
-      allColumnInfo[i].vector = columnarBatch.columnVectors[i];
-    }
+    int numberOfPages = scannedResult.numberOfpages();
+    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+      if (currentPageRowCount == 0) {
+        scannedResult.incrementPageCounter();
+        continue;
+      }
+      int rowCounter = scannedResult.getRowCounter();
+      int availableRows = currentPageRowCount - rowCounter;
+      int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize();
+      requiredRows = Math.min(requiredRows, availableRows);
+      if (requiredRows < 1) {
+        return;
+      }
+      for (int i = 0; i < allColumnInfo.length; i++) {
+        allColumnInfo[i].size = requiredRows;
+        allColumnInfo[i].offset = rowCounter;
+        allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+        allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+      }
 
-    scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
-    scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
-    scannedResult.fillColumnarMeasureBatch(measureInfo, measuresOrdinal);
-    scannedResult.fillColumnarComplexBatch(complexInfo);
-    scannedResult.setRowCounter(rowCounter + requiredRows);
-    columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
-    columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+      scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
+      scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
+      scannedResult.fillColumnarMeasureBatch(measureInfo, measuresOrdinal);
+      scannedResult.fillColumnarComplexBatch(complexInfo);
+      // it means fetched all data out of page so increment the page counter
+      if (availableRows == requiredRows) {
+        scannedResult.incrementPageCounter();
+      } else {
+        // Or set the row counter.
+        scannedResult.setRowCounter(rowCounter + requiredRows);
+      }
+      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+      columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 4588074..46c6714 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -21,12 +21,13 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 
-import org.apache.spark.sql.catalyst.util.*;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
 
 public class ArrayQueryType extends ComplexQueryType implements GenericQueryType {
 
@@ -61,11 +62,10 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
 
   }
 
-  public void parseBlocksAndReturnComplexColumnByteArray(
-      DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
-      DataOutputStream dataOutputStream) throws IOException {
+  public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
+      int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException {
     byte[] input = new byte[8];
-    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+    copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber, input);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
     int dataLength = byteArray.getInt();
     dataOutputStream.writeInt(dataLength);
@@ -73,7 +73,7 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
       int columnIndex = byteArray.getInt();
       for (int i = 0; i < dataLength; i++) {
         children
-            .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, columnIndex++,
+            .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, columnIndex++, pageNumber,
                 dataOutputStream);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
index 0966508..080d577 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.scan.complextypes;
 
 import java.io.IOException;
 
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 
@@ -45,9 +45,10 @@ public class ComplexQueryType {
    * @param rowNumber
    * @param input
    */
-  protected void copyBlockDataChunk(DimensionColumnDataChunk[] dimensionColumnDataChunks,
-      int rowNumber, byte[] input) {
-    byte[] data = dimensionColumnDataChunks[blockIndex].getChunkData(rowNumber);
+  protected void copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
+      int rowNumber, int pageNumber, byte[] input) {
+    byte[] data =
+        rawColumnChunks[blockIndex].convertToDimColDataChunk(pageNumber).getChunkData(rowNumber);
     System.arraycopy(data, 0, input, 0, data.length);
   }
 
@@ -55,8 +56,8 @@ public class ComplexQueryType {
    * This method will read the block data chunk from the respective block
    */
   protected void readBlockDataChunk(BlocksChunkHolder blockChunkHolder) throws IOException {
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 2f693d3..9c9be86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.Bits;
@@ -92,11 +93,12 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
   }
 
   @Override public void parseBlocksAndReturnComplexColumnByteArray(
-      DimensionColumnDataChunk[] dimensionDataChunks, int rowNumber,
-      DataOutputStream dataOutputStream) throws IOException {
-    byte[] currentVal =
-        new byte[dimensionDataChunks[blockIndex].getColumnValueSize()];
-    copyBlockDataChunk(dimensionDataChunks, rowNumber, currentVal);
+      DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
+      int pageNumber, DataOutputStream dataOutputStream) throws IOException {
+    DimensionColumnDataChunk dataChunk =
+        rawColumnChunks[blockIndex].convertToDimColDataChunk(pageNumber);
+    byte[] currentVal = new byte[dataChunk.getColumnValueSize()];
+    copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber, currentVal);
     dataOutputStream.write(currentVal);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 75b97c7..bb64e92 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 
@@ -82,10 +82,10 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
   }
 
   @Override public void parseBlocksAndReturnComplexColumnByteArray(
-      DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
-      DataOutputStream dataOutputStream) throws IOException {
+      DimensionRawColumnChunk[] dimensionColumnDataChunks, int rowNumber,
+      int pageNumber, DataOutputStream dataOutputStream) throws IOException {
     byte[] input = new byte[8];
-    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber, input);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
     int childElement = byteArray.getInt();
     dataOutputStream.writeInt(childElement);
@@ -93,7 +93,7 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
       for (int i = 0; i < childElement; i++) {
         children.get(i)
             .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber,
-                dataOutputStream);
+                pageNumber, dataOutputStream);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 82c193f..0c2e8ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -103,7 +104,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
         .getCarbonTableIdentifier().getTableName());
     // add executor service for query execution
-    queryProperties.executorService = Executors.newFixedThreadPool(1);
+    queryProperties.executorService = Executors.newCachedThreadPool();
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
     queryProperties.queryStatisticsRecorder =
@@ -331,7 +332,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // setting all the dimension chunk indexes to be read from file
     int numberOfElementToConsider = 0;
     // list of dimensions to be projected
-    List<Integer> allProjectionListDimensionIdexes = new ArrayList<>();
+    Set<Integer> allProjectionListDimensionIdexes = new LinkedHashSet<>();
     int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
         segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
         queryProperties.complexFilterDimension, allProjectionListDimensionIdexes);
@@ -346,10 +347,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     } else {
       blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
     }
-
+    // list of measures to be projected
+    List<Integer> allProjectionListMeasureIdexes = new ArrayList<>();
     int[] measureBlockIndexes = QueryUtil
         .getMeasureBlockIndexes(queryModel.getQueryMeasures(), expressionMeasures,
-            segmentProperties.getMeasuresOrdinalToBlockMapping(), queryProperties.filterMeasures);
+            segmentProperties.getMeasuresOrdinalToBlockMapping(), queryProperties.filterMeasures,
+            allProjectionListMeasureIdexes);
     if (measureBlockIndexes.length > 0) {
 
       numberOfElementToConsider = measureBlockIndexes[measureBlockIndexes.length - 1]
@@ -363,6 +366,14 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     } else {
       blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]);
     }
+    // setting the indexes of list of dimension in projection list
+    blockExecutionInfo.setProjectionListDimensionIndexes(ArrayUtils.toPrimitive(
+        allProjectionListDimensionIdexes
+            .toArray(new Integer[allProjectionListDimensionIdexes.size()])));
+    // setting the indexes of list of measures in projection list
+    blockExecutionInfo.setProjectionListMeasureIndexes(ArrayUtils.toPrimitive(
+        allProjectionListMeasureIdexes
+            .toArray(new Integer[allProjectionListMeasureIdexes.size()])));
     // setting the key structure info which will be required
     // to update the older block key with new key generator
     blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index d797126..2dd6721 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -98,6 +98,16 @@ public class BlockExecutionInfo {
   private int[][] allSelectedMeasureBlocksIndexes;
 
   /**
+   * list of dimension present in the projection
+   */
+  private int[] projectionListDimensionIndexes;
+
+  /**
+   * list of dimension present in the projection
+   */
+  private int[] projectionListMeasureIndexes;
+
+  /**
    * this will be used to update the older block fixed length keys with the
    * new block fixed length key
    */
@@ -599,4 +609,21 @@ public class BlockExecutionInfo {
   public void setBlockId(String blockId) {
     this.blockId = blockId;
   }
+
+  public int[] getProjectionListDimensionIndexes() {
+    return projectionListDimensionIndexes;
+  }
+
+  public void setProjectionListDimensionIndexes(int[] projectionListDimensionIndexes) {
+    this.projectionListDimensionIndexes = projectionListDimensionIndexes;
+  }
+
+  public int[] getProjectionListMeasureIndexes() {
+    return projectionListMeasureIndexes;
+  }
+
+  public void setProjectionListMeasureIndexes(int[] projectionListMeasureIndexes) {
+    this.projectionListMeasureIndexes = projectionListMeasureIndexes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index a8338e7..ef6fb8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -209,7 +209,7 @@ public class QueryUtil {
   public static int[] getDimensionsBlockIndexes(List<QueryDimension> queryDimensions,
       Map<Integer, Integer> dimensionOrdinalToBlockMapping,
       List<CarbonDimension> customAggregationDimension, Set<CarbonDimension> filterDimensions,
-      List<Integer> allProjectionListDimensionIndexes) {
+      Set<Integer> allProjectionListDimensionIndexes) {
     // using set as in row group columns will point to same block
     Set<Integer> dimensionBlockIndex = new HashSet<Integer>();
     Set<Integer> filterDimensionOrdinal = getFilterDimensionOrdinal(filterDimensions);
@@ -218,7 +218,13 @@ public class QueryUtil {
       if (queryDimensions.get(i).getDimension().hasEncoding(Encoding.IMPLICIT)) {
         continue;
       }
-      allProjectionListDimensionIndexes.add(queryDimensions.get(i).getDimension().getOrdinal());
+
+      allProjectionListDimensionIndexes.add(
+          dimensionOrdinalToBlockMapping.get(queryDimensions.get(i).getDimension().getOrdinal()));
+      if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
+        addChildrenBlockIndex(allProjectionListDimensionIndexes,
+            queryDimensions.get(i).getDimension());
+      }
 
       if (!filterDimensionOrdinal.contains(queryDimensions.get(i).getDimension().getOrdinal())) {
         blockIndex =
@@ -394,10 +400,11 @@ public class QueryUtil {
    */
   public static int[] getMeasureBlockIndexes(List<QueryMeasure> queryMeasures,
       List<CarbonMeasure> expressionMeasure, Map<Integer, Integer> ordinalToBlockIndexMapping,
-      Set<CarbonMeasure> filterMeasures) {
+      Set<CarbonMeasure> filterMeasures, List<Integer> allProjectionListMeasureIdexes) {
     Set<Integer> measureBlockIndex = new HashSet<Integer>();
     Set<Integer> filterMeasureOrdinal = getFilterMeasureOrdinal(filterMeasures);
     for (int i = 0; i < queryMeasures.size(); i++) {
+      allProjectionListMeasureIdexes.add(queryMeasures.get(i).getMeasure().getOrdinal());
       if (!filterMeasureOrdinal.contains(queryMeasures.get(i).getMeasure().getOrdinal())) {
         measureBlockIndex
             .add(ordinalToBlockIndexMapping.get(queryMeasures.get(i).getMeasure().getOrdinal()));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index e933f98..520b460 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -68,7 +68,6 @@ import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.IncludeColGroupFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.IncludeFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.OrFilterExecuterImpl;
-import org.apache.carbondata.core.scan.filter.executer.RestructureFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.RowLevelRangeTypeExecuterFacory;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
@@ -125,10 +124,6 @@ public final class FilterUtil {
                   complexDimensionInfoMap),
               createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties,
                   complexDimensionInfoMap));
-        case RESTRUCTURE:
-          return new RestructureFilterExecuterImpl(
-              filterExpressionResolverTree.getDimColResolvedFilterInfo(),
-              segmentProperties);
         case ROWLEVEL_LESSTHAN:
         case ROWLEVEL_LESSTHAN_EQUALTO:
         case ROWLEVEL_GREATERTHAN_EQUALTO:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index 4518149..3742e7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -21,7 +21,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 
 import org.apache.spark.sql.types.DataType;
@@ -40,8 +40,8 @@ public interface GenericQueryType {
 
   int getColsCount();
 
-  void parseBlocksAndReturnComplexColumnByteArray(DimensionColumnDataChunk[] dimensionDataChunks,
-      int rowNumber, DataOutputStream dataOutputStream) throws IOException;
+  void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
+      int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException;
 
   DataType getSchemaType();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
index 4356511..971f9b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
@@ -21,6 +21,7 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 public class AndFilterExecuterImpl implements FilterExecuter {
 
@@ -32,13 +33,13 @@ public class AndFilterExecuterImpl implements FilterExecuter {
     this.rightExecuter = rightExecuter;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
-    BitSet leftFilters = leftExecuter.applyFilter(blockChunkHolder);
+    BitSetGroup leftFilters = leftExecuter.applyFilter(blockChunkHolder);
     if (leftFilters.isEmpty()) {
       return leftFilters;
     }
-    BitSet rightFilter = rightExecuter.applyFilter(blockChunkHolder);
+    BitSetGroup rightFilter = rightExecuter.applyFilter(blockChunkHolder);
     if (rightFilter.isEmpty()) {
       return rightFilter;
     }
@@ -58,4 +59,9 @@ public class AndFilterExecuterImpl implements FilterExecuter {
     leftFilters.and(rightFilter);
     return leftFilters;
   }
+
+  @Override public void readBlocks(BlocksChunkHolder blocksChunkHolder) throws IOException {
+    leftExecuter.readBlocks(blocksChunkHolder);
+    rightExecuter.readBlocks(blocksChunkHolder);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 667474d..8e7a3c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -21,11 +21,13 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 public class ExcludeFilterExecuterImpl implements FilterExecuter {
@@ -43,15 +45,26 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
         dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk dimensionRawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    DimensionColumnDataChunk[] dimensionColumnDataChunks =
+        dimensionRawColumnChunk.convertToDimColDataChunks();
+    BitSetGroup bitSetGroup =
+        new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+    for (int i = 0; i < dimensionColumnDataChunks.length; i++) {
+      BitSet bitSet = getFilteredIndexes(dimensionColumnDataChunks[i],
+          dimensionRawColumnChunk.getRowCount()[i]);
+      bitSetGroup.setBitSet(bitSet, i);
+    }
+
+    return bitSetGroup;
   }
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimColumnDataChunk,
@@ -148,4 +161,13 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     bitSet.flip(0, 1);
     return bitSet;
   }
+
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
index c476c0a..7182dd5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
@@ -21,6 +21,7 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 public interface FilterExecuter {
 
@@ -30,7 +31,7 @@ public interface FilterExecuter {
    * @return
    * @throws FilterUnsupportedException
    */
-  BitSet applyFilter(BlocksChunkHolder blocksChunkHolder)
+  BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder)
       throws FilterUnsupportedException, IOException;
 
   /**
@@ -42,4 +43,10 @@ public interface FilterExecuter {
    * @return BitSet
    */
   BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue);
+
+  /**
+   * It just reads necessary block for filter executor, it does not uncompress the data.
+   * @param blockChunkHolder
+   */
+  void readBlocks(BlocksChunkHolder blockChunkHolder)throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index c2a717f..e640d71 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -21,11 +21,13 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -45,15 +47,31 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
 
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColumnEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk dimensionRawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+    for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+      if (dimensionRawColumnChunk.getMaxValues() != null) {
+        if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
+            dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys())) {
+          BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+              dimensionRawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
+        }
+      } else {
+        BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+            dimensionRawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
   }
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -149,16 +167,25 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
 
+    boolean isScanRequired =
+        isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+  private boolean isScanRequired(byte[] blkMaxVal, byte[] blkMinVal, byte[][] filterValues) {
     boolean isScanRequired = false;
     for (int k = 0; k < filterValues.length; k++) {
       // filter value should be in range of max and min value i.e
       // max>filtervalue>min
       // so filter-max should be negative
       int maxCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMaxVal[blockIndex]);
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMaxVal);
       // and filter-min should be positive
       int minCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMinVal[blockIndex]);
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMinVal);
 
       // if any filter value is in range than this block needs to be
       // scanned
@@ -167,10 +194,15 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
         break;
       }
     }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
+    return isScanRequired;
   }
 
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColumnEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
index 248bb22..119bda7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
@@ -21,6 +21,7 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 
 public class OrFilterExecuterImpl implements FilterExecuter {
 
@@ -32,10 +33,10 @@ public class OrFilterExecuterImpl implements FilterExecuter {
     this.rightExecuter = rightExecuter;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
-    BitSet leftFilters = leftExecuter.applyFilter(blockChunkHolder);
-    BitSet rightFilters = rightExecuter.applyFilter(blockChunkHolder);
+    BitSetGroup leftFilters = leftExecuter.applyFilter(blockChunkHolder);
+    BitSetGroup rightFilters = rightExecuter.applyFilter(blockChunkHolder);
     leftFilters.or(rightFilters);
 
     return leftFilters;
@@ -48,4 +49,8 @@ public class OrFilterExecuterImpl implements FilterExecuter {
     return leftFilters;
   }
 
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    leftExecuter.readBlocks(blockChunkHolder);
+    rightExecuter.readBlocks(blockChunkHolder);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
deleted file mode 100644
index 715c98d..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.filter.executer;
-
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-
-public class RestructureFilterExecuterImpl implements FilterExecuter {
-
-  DimColumnExecuterFilterInfo dimColumnExecuterInfo;
-
-  public RestructureFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnResolvedFilterInfo,
-      SegmentProperties segmentProperties) {
-    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
-    FilterUtil
-        .prepareKeysFromSurrogates(dimColumnResolvedFilterInfo.getFilterValues(), segmentProperties,
-            dimColumnResolvedFilterInfo.getDimension(), dimColumnExecuterInfo);
-  }
-
-  @Override public BitSet applyFilter(BlocksChunkHolder blocksChunkHolder) {
-    BitSet bitSet = new BitSet(blocksChunkHolder.getDataBlock().nodeSize());
-    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
-    if (null != filterValues && filterValues.length > 0) {
-      bitSet.set(0, blocksChunkHolder.getDataBlock().nodeSize());
-    }
-    return bitSet;
-  }
-
-  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
-    BitSet bitSet = new BitSet(1);
-    bitSet.set(0);
-    return bitSet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 8538209..7595ab6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -20,8 +20,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +31,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
@@ -50,6 +52,7 @@ import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -90,56 +93,48 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
     this.complexDimensionInfoMap = complexDimensionInfoMap;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
-    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
-      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i);
-      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
-          && dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.STRUCT) {
-        if (null == blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]]) {
-          blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]] = blockChunkHolder.getDataBlock()
-              .getDimensionChunk(blockChunkHolder.getFileReader(), blocksIndex[i]);
-        }
-      } else {
-        GenericQueryType complexType = complexDimensionInfoMap.get(blocksIndex[i]);
-        complexType.fillRequiredBlockData(blockChunkHolder);
-      }
-    }
+    readBlocks(blockChunkHolder);
+    // CHECKSTYLE:ON
 
-    // CHECKSTYLE:OFF Approval No:Approval-V1R2C10_001
-    if (null != msrColEvalutorInfoList) {
-      for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : msrColEvalutorInfoList) {
-        if (null == blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo
-            .getColumnIndex()]) {
-          blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] =
-              blockChunkHolder.getDataBlock().getMeasureChunk(blockChunkHolder.getFileReader(),
-                  msrColumnEvalutorInfo.getColumnIndex());
+    int[] numberOfRows = null;
+    int pageNumbers = 0;
+
+    if (dimColEvaluatorInfoList.size() > 0) {
+      pageNumbers = blockChunkHolder.getDimensionRawDataChunk()[blocksIndex[0]].getPagesCount();
+      numberOfRows = blockChunkHolder.getDimensionRawDataChunk()[blocksIndex[0]].getRowCount();
+    }
+    if (msrColEvalutorInfoList.size() > 0) {
+      int columnIndex = msrColEvalutorInfoList.get(0).getColumnIndex();
+      pageNumbers = blockChunkHolder.getMeasureRawDataChunk()[columnIndex].getPagesCount();
+      numberOfRows = blockChunkHolder.getMeasureRawDataChunk()[columnIndex].getRowCount();
+    }
+    BitSetGroup bitSetGroup = new BitSetGroup(pageNumbers);
+    for (int i = 0; i < pageNumbers; i++) {
+      BitSet set = new BitSet(numberOfRows[i]);
+      RowIntf row = new RowImpl();
+      boolean invalidRowsPresent = false;
+      for (int index = 0; index < numberOfRows[i]; index++) {
+        createRow(blockChunkHolder, row ,i, index);
+        Boolean rslt = false;
+        try {
+          rslt = exp.evaluate(row).getBoolean();
+        }
+        // Any invalid member while evaluation shall be ignored, system will log the
+        // error only once since all rows the evaluation happens so inorder to avoid
+        // too much log inforation only once the log will be printed.
+        catch (FilterIllegalMemberException e) {
+          FilterUtil.logError(e, invalidRowsPresent);
+        }
+        if (null != rslt && rslt) {
+          set.set(index);
         }
       }
+      bitSetGroup.setBitSet(set, i);
     }
-    // CHECKSTYLE:ON
 
-    int numberOfRows = blockChunkHolder.getDataBlock().nodeSize();
-    BitSet set = new BitSet(numberOfRows);
-    RowIntf row = new RowImpl();
-    boolean invalidRowsPresent = false;
-    for (int index = 0; index < numberOfRows; index++) {
-      createRow(blockChunkHolder, row, index);
-      Boolean rslt = false;
-      try {
-        rslt = exp.evaluate(row).getBoolean();
-      }
-      // Any invalid member while evaluation shall be ignored, system will log the
-      // error only once since all rows the evaluation happens so inorder to avoid
-      // too much log inforation only once the log will be printed.
-      catch (FilterIllegalMemberException e) {
-        FilterUtil.logError(e, invalidRowsPresent);
-      }
-      if (null != rslt && rslt) {
-        set.set(index);
-      }
-    }
-    return set;
+    return bitSetGroup;
   }
 
   /**
@@ -151,7 +146,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
    * @param index
    * @throws IOException
    */
-  private void createRow(BlocksChunkHolder blockChunkHolder, RowIntf row, int index)
+  private void createRow(BlocksChunkHolder blockChunkHolder, RowIntf row, int pageIndex, int index)
       throws IOException {
     Object[] record = new Object[dimColEvaluatorInfoList.size() + msrColEvalutorInfoList.size()];
     String memberString;
@@ -162,28 +157,28 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
         if (!dimColumnEvaluatorInfo.isDimensionExistsInCurrentSilce()) {
           record[dimColumnEvaluatorInfo.getRowIndex()] = dimColumnEvaluatorInfo.getDefaultValue();
         }
+        DimensionColumnDataChunk columnDataChunk =
+            blockChunkHolder.getDimensionRawDataChunk()[blocksIndex[i]]
+                .convertToDimColDataChunk(pageIndex);
         if (!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
-            && blockChunkHolder
-            .getDimensionDataChunk()[blocksIndex[i]] instanceof VariableLengthDimensionDataChunk) {
+            && columnDataChunk instanceof VariableLengthDimensionDataChunk) {
 
           VariableLengthDimensionDataChunk dimensionColumnDataChunk =
-              (VariableLengthDimensionDataChunk) blockChunkHolder
-                  .getDimensionDataChunk()[blocksIndex[i]];
-          memberString = readMemberBasedOnNoDictionaryVal(dimensionColumnDataChunk, index);
-          if (null != memberString) {
-            if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-              memberString = null;
+              (VariableLengthDimensionDataChunk) columnDataChunk;
+          byte[] memberBytes = dimensionColumnDataChunk.getChunkData(index);
+          if (null != memberBytes) {
+            if (Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, memberBytes)) {
+              memberBytes = null;
             }
             record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
-                .getDataBasedOnDataType(memberString,
+                .getDataBasedOnDataType(memberBytes,
                     dimColumnEvaluatorInfo.getDimension().getDataType());
           } else {
             continue;
           }
         } else {
-          int dictionaryValue =
-              readSurrogatesFromColumnBlock(blockChunkHolder, index, dimColumnEvaluatorInfo,
-                  blocksIndex[i]);
+          int dictionaryValue = readSurrogatesFromColumnBlock(blockChunkHolder, index, pageIndex,
+              dimColumnEvaluatorInfo, blocksIndex[i]);
           if (dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
               && !dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
             memberString =
@@ -204,9 +199,8 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
           GenericQueryType complexType = complexDimensionInfoMap.get(blocksIndex[i]);
           ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
           DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
-          complexType
-              .parseBlocksAndReturnComplexColumnByteArray(blockChunkHolder.getDimensionDataChunk(),
-                  index, dataOutputStream);
+          complexType.parseBlocksAndReturnComplexColumnByteArray(
+              blockChunkHolder.getDimensionRawDataChunk(), index, pageIndex, dataOutputStream);
           record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
               .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
           byteStream.close();
@@ -232,23 +226,25 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
       }
       // if measure doesnt exist then set the default value.
       Object msrValue;
+      MeasureColumnDataChunk measureColumnDataChunk =
+          blockChunkHolder.getMeasureRawDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+              .convertToMeasureColDataChunk(pageIndex);
       switch (msrType) {
         case INT:
         case LONG:
-          msrValue = blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-              .getMeasureDataHolder().getReadableLongValueByIndex(index);
+          msrValue =
+              measureColumnDataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
           break;
         case DECIMAL:
-          msrValue = blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-              .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+          msrValue = measureColumnDataChunk.getMeasureDataHolder()
+              .getReadableBigDecimalValueByIndex(index);
           break;
         default:
-          msrValue = blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-              .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+          msrValue =
+              measureColumnDataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
       }
       record[msrColumnEvalutorInfo.getRowIndex()] =
-          blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-              .getNullValueIndexHolder().getBitSet().get(index) ? null : msrValue;
+          measureColumnDataChunk.getNullValueIndexHolder().getBitSet().get(index) ? null : msrValue;
     }
     row.setValues(record);
   }
@@ -305,32 +301,32 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
    * @param dimColumnEvaluatorInfo
    * @return
    */
-  private int readSurrogatesFromColumnBlock(BlocksChunkHolder blockChunkHolder, int index,
+  private int readSurrogatesFromColumnBlock(BlocksChunkHolder blockChunkHolder, int index, int page,
       DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
+    DimensionColumnDataChunk dataChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex].convertToDimColDataChunk(page);
     if (dimColumnEvaluatorInfo.getDimension().isColumnar()) {
-      byte[] rawData = blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
+      byte[] rawData = dataChunk.getChunkData(index);
       ByteBuffer byteBuffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
       int dictionaryValue = CarbonUtil.getSurrogateKey(rawData, byteBuffer);
       return dictionaryValue;
     } else {
-      return readSurrogatesFromColumnGroupBlock(blockChunkHolder, index, dimColumnEvaluatorInfo,
-          blockIndex);
+      return readSurrogatesFromColumnGroupBlock(dataChunk, index, dimColumnEvaluatorInfo);
     }
 
   }
 
   /**
-   * @param blockChunkHolder
    * @param index
    * @param dimColumnEvaluatorInfo
    * @return read surrogate of given row of given column group dimension
    */
-  private int readSurrogatesFromColumnGroupBlock(BlocksChunkHolder blockChunkHolder, int index,
-      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
+  private int readSurrogatesFromColumnGroupBlock(DimensionColumnDataChunk chunk, int index,
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo) {
     try {
       KeyStructureInfo keyStructureInfo =
           QueryUtil.getKeyStructureInfo(segmentProperties, dimColumnEvaluatorInfo);
-      byte[] colData = blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
+      byte[] colData = chunk.getChunkData(index);
       long[] result = keyStructureInfo.getKeyGenerator().getKeyArray(colData);
       int colGroupId =
           QueryUtil.getColumnGroupId(segmentProperties, dimColumnEvaluatorInfo.getColumnIndex());
@@ -343,24 +339,38 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
     return 0;
   }
 
-  /**
-   * Reading the blocks for no dictionary data, in no dictionary case
-   * directly the filter data will read, no need to scan the dictionary
-   * or read the dictionary value.
-   *
-   * @param dimensionColumnDataChunk
-   * @param index
-   * @return
-   */
-  private String readMemberBasedOnNoDictionaryVal(
-      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int index) {
-    return new String(dimensionColumnDataChunk.getChunkData(index),
-        Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-  }
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
     bitSet.set(0);
     return bitSet;
   }
+
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i);
+      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
+          && dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.STRUCT) {
+        if (null == blockChunkHolder.getDimensionRawDataChunk()[blocksIndex[i]]) {
+          blockChunkHolder.getDimensionRawDataChunk()[blocksIndex[i]] =
+              blockChunkHolder.getDataBlock()
+                  .getDimensionChunk(blockChunkHolder.getFileReader(), blocksIndex[i]);
+        }
+      } else {
+        GenericQueryType complexType = complexDimensionInfoMap.get(blocksIndex[i]);
+        complexType.fillRequiredBlockData(blockChunkHolder);
+      }
+    }
+
+    if (null != msrColEvalutorInfoList) {
+      for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : msrColEvalutorInfoList) {
+        if (null == blockChunkHolder.getMeasureRawDataChunk()[msrColumnEvalutorInfo
+            .getColumnIndex()]) {
+          blockChunkHolder.getMeasureRawDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] =
+              blockChunkHolder.getDataBlock().getMeasureChunk(blockChunkHolder.getFileReader(),
+                  msrColumnEvalutorInfo.getColumnIndex());
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index cd60190..9f28d7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -30,6 +31,7 @@ import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedExc
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -50,13 +52,21 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangeValues;
     int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = isScanRequired(blockMaxValue[columnIndex], filterValues);
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  private boolean isScanRequired(byte[] blockMaxValue, byte[][] filterValues) {
     boolean isScanRequired = false;
     for (int k = 0; k < filterValues.length; k++) {
       // filter value should be in range of max and min value i.e
       // max>filtervalue>min
       // so filter-max should be negative
-      int maxCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
+      int maxCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue);
       // if any filter value is in range than this block needs to be
       // scanned means always less than block max range.
       if (maxCompare < 0) {
@@ -64,26 +74,45 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
         break;
       }
     }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
-
+    return isScanRequired;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
     if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
       return super.applyFilter(blockChunkHolder);
     }
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk rawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+    for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+      if (rawColumnChunk.getMaxValues() != null) {
+        if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues)) {
+          int compare = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(filterRangeValues[0], rawColumnChunk.getMinValues()[i]);
+          if (compare < 0) {
+            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
+            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          } else {
+            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+                rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        }
+      } else {
+        BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+            rawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
   }
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -118,8 +147,9 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
           .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
               filterValues[i], true);
       if (start >= 0) {
-        start = CarbonUtil.nextGreaterValueToTarget(start, dimensionColumnDataChunk,
-            filterValues[i], numerOfRows);
+        start = CarbonUtil
+            .nextGreaterValueToTarget(start, dimensionColumnDataChunk, filterValues[i],
+                numerOfRows);
       }
       // Logic will handle the case where the range filter member is not present in block
       // in this case the binary search will return the index from where the bit sets will be
@@ -207,4 +237,15 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     return bitSet;
   }
 
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      super.readBlocks(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 02666c4..27377c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -30,6 +31,7 @@ import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedExc
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -51,13 +53,21 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangeValues;
     int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = isScanRequired(blockMaxValue[columnIndex], filterValues);
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  private boolean isScanRequired(byte[] blockMaxValue, byte[][] filterValues) {
     boolean isScanRequired = false;
     for (int k = 0; k < filterValues.length; k++) {
       // filter value should be in range of max and min value i.e
       // max>filtervalue>min
       // so filter-max should be negative
-      int maxCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
+      int maxCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue);
       // if any filter value is in range than this block needs to be
       // scanned less than equal to max range.
       if (maxCompare <= 0) {
@@ -65,26 +75,45 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
         break;
       }
     }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
-
+    return isScanRequired;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
     if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
       return super.applyFilter(blockChunkHolder);
     }
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk rawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+    for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+      if (rawColumnChunk.getMaxValues() != null) {
+        if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues)) {
+          int compare = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(filterRangeValues[0], rawColumnChunk.getMinValues()[i]);
+          if (compare <= 0) {
+            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
+            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          } else {
+            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+                rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        }
+      } else {
+        BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+            rawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
   }
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -194,4 +223,16 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     }
     return bitSet;
   }
+
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      super.readBlocks(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 82ca4d9..2bdce8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -53,11 +55,18 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangeValues;
     int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = isScanRequired(blockMinValue[columnIndex], filterValues);
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues) {
     boolean isScanRequired = false;
     for (int k = 0; k < filterValues.length; k++) {
       // and filter-min should be positive
-      int minCompare =
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
+      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue);
 
       // if any filter applied is not in range of min and max of block
       // then since its a less than equal to fiter validate whether the block
@@ -67,26 +76,45 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
         break;
       }
     }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
-
+    return isScanRequired;
   }
 
-  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
       throws FilterUnsupportedException, IOException {
     if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
       return super.applyFilter(blockChunkHolder);
     }
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
         .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
-    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
-    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
-        blockChunkHolder.getDataBlock().nodeSize());
+    DimensionRawColumnChunk rawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+    for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+      if (rawColumnChunk.getMinValues() != null) {
+        if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
+          int compare = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
+          if (compare >= 0) {
+            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
+            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          } else {
+            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+                rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        }
+      } else {
+        BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+            rawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
   }
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -151,7 +179,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
               filterValues[i], true);
       if (start < 0) {
         start = -(start + 1);
-        if (start == numerOfRows) {
+        if (start >= numerOfRows) {
           start = start - 1;
         }
         // Method will compare the tentative index value after binary search, this tentative
@@ -218,7 +246,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
             filterValues[k], true);
         if (start < 0) {
           start = -(start + 1);
-          if (start == numerOfRows) {
+          if (start >= numerOfRows) {
             start = start - 1;
           }
           // Method will compare the tentative index value after binary search, this tentative
@@ -243,4 +271,15 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     return bitSet;
   }
 
+  @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      super.readBlocks(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
 }