You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 07:58:19 UTC

[17/35] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
index 553f85e..773fbd7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
@@ -17,20 +17,15 @@
 package org.apache.carbondata.core.scan.result.iterator;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 
 public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]> {
 
-  private CarbonIterator<BatchResult> iterator;
-  private BatchResult batch;
+  private CarbonIterator<RowBatch> iterator;
+  private RowBatch batch;
   private int counter;
 
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName());
-
-  public PartitionSpliterRawResultIterator(CarbonIterator<BatchResult> iterator) {
+  public PartitionSpliterRawResultIterator(CarbonIterator<RowBatch> iterator) {
     this.iterator = iterator;
   }
 
@@ -65,7 +60,7 @@ public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]>
    * @param batch
    * @return
    */
-  private boolean checkBatchEnd(BatchResult batch) {
+  private boolean checkBatchEnd(RowBatch batch) {
     return !(counter < batch.getSize());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 70d0958..1dd1595 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 
 /**
@@ -37,7 +37,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
   /**
    * Iterator of the Batch raw result.
    */
-  private CarbonIterator<BatchResult> detailRawQueryResultIterator;
+  private CarbonIterator<RowBatch> detailRawQueryResultIterator;
 
   /**
    * Counter to maintain the row counter.
@@ -55,9 +55,9 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
   /**
    * batch of the result.
    */
-  private BatchResult batch;
+  private RowBatch batch;
 
-  public RawResultIterator(CarbonIterator<BatchResult> detailRawQueryResultIterator,
+  public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
       SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
     this.detailRawQueryResultIterator = detailRawQueryResultIterator;
     this.sourceSegProperties = sourceSegProperties;
@@ -155,7 +155,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
    * @param batch
    * @return
    */
-  private boolean checkIfBatchIsProcessedCompletely(BatchResult batch) {
+  private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
     if (counter < batch.getSize()) {
       return false;
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
index cc9710e..c7cb00d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -35,10 +35,12 @@ public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIt
     super(infos, queryModel, execService);
   }
 
-  @Override public Object next() {
+  @Override
+  public Object next() {
     throw new UnsupportedOperationException("call processNextBatch instead");
   }
 
+  @Override
   public void processNextBatch(CarbonColumnarBatch columnarBatch) {
     synchronized (lock) {
       updateDataBlockIterator();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index cfc2f16..973ce0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -87,7 +87,4 @@ public class CarbonColumnarBatch {
     }
   }
 
-  public int getRowsFilteredCount() {
-    return rowsFiltered;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index a5f81b9..59117dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -18,16 +18,16 @@ package org.apache.carbondata.core.scan.result.vector;
 
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 
 public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
   public int offset;
   public int size;
   public CarbonColumnVector vector;
   public int vectorOffset;
-  public QueryDimension dimension;
-  public QueryMeasure measure;
+  public ProjectionDimension dimension;
+  public ProjectionMeasure measure;
   public int ordinal;
   public DirectDictionaryGenerator directDictionaryGenerator;
   public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index db4c982..8902dfb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -29,7 +29,7 @@ public class MeasureDataVectorProcessor {
 
     void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info);
 
-    void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+    void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
         ColumnVectorInfo info);
   }
 
@@ -60,7 +60,7 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
@@ -69,13 +69,13 @@ public class MeasureDataVectorProcessor {
       BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           vector.putInt(vectorOffset, (int)dataChunk.getLong(currentRow));
           vectorOffset++;
         }
       } else {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
@@ -117,7 +117,7 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping,
+    public void fillMeasureVector(int[] filteredRowId,
         ColumnPage dataChunk, ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
@@ -126,13 +126,13 @@ public class MeasureDataVectorProcessor {
       BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           vector.putBoolean(vectorOffset, dataChunk.getBoolean(currentRow));
           vectorOffset++;
         }
       } else {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
@@ -171,7 +171,7 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
@@ -180,13 +180,13 @@ public class MeasureDataVectorProcessor {
       BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           vector.putShort(vectorOffset, (short) dataChunk.getLong(currentRow));
           vectorOffset++;
         }
       } else {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
@@ -225,7 +225,7 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
@@ -234,13 +234,13 @@ public class MeasureDataVectorProcessor {
       BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           vector.putLong(vectorOffset, dataChunk.getLong(currentRow));
           vectorOffset++;
         }
       } else {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {
@@ -279,7 +279,7 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
@@ -288,7 +288,7 @@ public class MeasureDataVectorProcessor {
       int precision = info.measure.getMeasure().getPrecision();
       BitSet nullBitSet = dataChunk.getNullBits();
       for (int i = offset; i < len; i++) {
-        int currentRow = rowMapping[i];
+        int currentRow = filteredRowId[i];
         if (nullBitSet.get(currentRow)) {
           vector.putNull(vectorOffset);
         } else {
@@ -330,7 +330,7 @@ public class MeasureDataVectorProcessor {
     }
 
     @Override
-    public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
         ColumnVectorInfo info) {
       int offset = info.offset;
       int len = offset + info.size;
@@ -339,13 +339,13 @@ public class MeasureDataVectorProcessor {
       BitSet nullBitSet = dataChunk.getNullBits();
       if (nullBitSet.isEmpty()) {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           vector.putDouble(vectorOffset, dataChunk.getDouble(currentRow));
           vectorOffset++;
         }
       } else {
         for (int i = offset; i < len; i++) {
-          int currentRow = rowMapping[i];
+          int currentRow = filteredRowId[i];
           if (nullBitSet.get(currentRow)) {
             vector.putNull(vectorOffset);
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
deleted file mode 100644
index bf26ca3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.scanner;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-
-/**
- * Blocklet scanner class to process the block
- */
-public abstract class AbstractBlockletScanner implements BlockletScanner {
-
-  /**
-   * block execution info
-   */
-  protected BlockExecutionInfo blockExecutionInfo;
-
-  public QueryStatisticsModel queryStatisticsModel;
-
-  private AbstractScannedResult emptyResult;
-
-  public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
-    this.blockExecutionInfo = tableBlockExecutionInfos;
-  }
-
-  @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
-      throws IOException, FilterUnsupportedException {
-    long startTime = System.currentTimeMillis();
-    AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
-    QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
-    totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
-        totalBlockletStatistic.getCount() + 1);
-    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
-    validScannedBlockletStatistic
-        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
-            validScannedBlockletStatistic.getCount() + 1);
-    // adding statistics for valid number of pages
-    QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
-    validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
-        validPages.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
-    // adding statistics for number of pages
-    QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
-    totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
-        totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
-            .getDataBlock().blockletId());
-    if (!blockExecutionInfo.isPrefetchBlocklet()) {
-      readBlocklet(blocksChunkHolder);
-    }
-    DimensionRawColumnChunk[] dimensionRawColumnChunks =
-        blocksChunkHolder.getDimensionRawDataChunk();
-    DimensionColumnDataChunk[][] dimensionColumnDataChunks =
-        new DimensionColumnDataChunk[dimensionRawColumnChunks.length][blocksChunkHolder
-            .getDataBlock().numberOfPages()];
-    MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getMeasureRawDataChunk();
-    ColumnPage[][] columnPages =
-        new ColumnPage[measureRawColumnChunks.length][blocksChunkHolder.getDataBlock()
-                       .numberOfPages()];
-    scannedResult.setDimensionChunks(dimensionColumnDataChunks);
-    scannedResult.setMeasureChunks(columnPages);
-    scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
-    scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
-    if (blockExecutionInfo.isPrefetchBlocklet()) {
-      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
-        if (dimensionRawColumnChunks[i] != null) {
-          dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks();
-        }
-      }
-      for (int i = 0; i < measureRawColumnChunks.length; i++) {
-        if (measureRawColumnChunks[i] != null) {
-          columnPages[i] = measureRawColumnChunks[i].convertToColumnPage();
-        }
-      }
-    }
-    int[] numberOfRows = null;
-    if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
-      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
-        if (dimensionRawColumnChunks[i] != null) {
-          numberOfRows = dimensionRawColumnChunks[i].getRowCount();
-          break;
-        }
-      }
-    } else if (blockExecutionInfo.getAllSelectedMeasureBlocksIndexes().length > 0) {
-      for (int i = 0; i < measureRawColumnChunks.length; i++) {
-        if (measureRawColumnChunks[i] != null) {
-          numberOfRows = measureRawColumnChunks[i].getRowCount();
-          break;
-        }
-      }
-    }
-
-    // count(*)  case there would not be any dimensions are measures selected.
-    if (numberOfRows == null) {
-      numberOfRows = new int[blocksChunkHolder.getDataBlock().numberOfPages()];
-      for (int i = 0; i < numberOfRows.length; i++) {
-        numberOfRows[i] =
-            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
-      }
-      int lastPageSize = blocksChunkHolder.getDataBlock().nodeSize()
-          % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
-      ;
-      if (lastPageSize > 0) {
-        numberOfRows[numberOfRows.length - 1] = lastPageSize;
-      }
-    }
-    scannedResult.setNumberOfRows(numberOfRows);
-    if (!blockExecutionInfo.isPrefetchBlocklet()) {
-      scannedResult.fillDataChunks();
-    }
-    // adding statistics for carbon scan time
-    QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
-    scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
-        scanTime.getCount() + (System.currentTimeMillis() - startTime));
-    return scannedResult;
-  }
-
-  @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
-    long startTime = System.currentTimeMillis();
-    DimensionRawColumnChunk[] dimensionRawColumnChunks = blocksChunkHolder.getDataBlock()
-        .getDimensionChunks(blocksChunkHolder.getFileReader(),
-            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes());
-    blocksChunkHolder.setDimensionRawDataChunk(dimensionRawColumnChunks);
-    MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getDataBlock()
-        .getMeasureChunks(blocksChunkHolder.getFileReader(),
-            blockExecutionInfo.getAllSelectedMeasureBlocksIndexes());
-    blocksChunkHolder.setMeasureRawDataChunk(measureRawColumnChunks);
-    // adding statistics for carbon read time
-    QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
-    readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
-        readTime.getCount() + (System.currentTimeMillis() - startTime));
-  }
-
-  @Override public AbstractScannedResult createEmptyResult() {
-    if (emptyResult == null) {
-      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
-      emptyResult.setNumberOfRows(new int[0]);
-      emptyResult.setIndexes(new int[0][]);
-    }
-    return emptyResult;
-  }
-
-  @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
-    // For non filter it is always true
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
index 0ed0d43..0a41032 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -18,9 +18,10 @@ package org.apache.carbondata.core.scan.scanner;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
 
 /**
  * Interface for processing the block
@@ -30,31 +31,26 @@ public interface BlockletScanner {
 
   /**
    * Checks whether this blocklet required to scan or not based on min max of each blocklet.
-   * @param blocksChunkHolder
+   * @param dataBlock
    * @return
    * @throws IOException
    */
-  boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException;
+  boolean isScanRequired(DataRefNode dataBlock);
 
   /**
    * Below method will used to process the block data and get the scanned result
    *
-   * @param blocksChunkHolder block chunk which holds the block data
+   * @param rawBlockletColumnChunks block chunk which holds the block data
    * @return scannerResult
    * result after processing
    */
-  AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+  BlockletScannedResult scanBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks)
       throws IOException, FilterUnsupportedException;
 
   /**
    * Just reads the blocklet from file, does not uncompress it.
-   * @param blocksChunkHolder
+   * @param rawBlockletColumnChunks
    */
-  void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException;
+  void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException;
 
-  /**
-   * In case if there is no filter satisfies.
-   * @return AbstractScannedResult
-   */
-  AbstractScannedResult createEmptyResult();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
new file mode 100644
index 0000000..1c73d63
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.BitSetGroup;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Below class will be used for filter query processing
+ * this class will be first apply the filter then it will read the column page if
+ * required and return the scanned result
+ */
+public class BlockletFilterScanner extends BlockletFullScanner {
+
+  /**
+   * filter executer to evaluate filter condition
+   */
+  private FilterExecuter filterExecuter;
+  /**
+   * this will be used to apply min max
+   * this will be useful for dimension column which is on the right side
+   * as node finder will always give tentative blocks, if column data stored individually
+   * and data is in sorted order then we can check whether filter is in the range of min max or not
+   * if it present then only we can apply filter on complete data.
+   * this will be very useful in case of sparse data when rows are
+   * repeating.
+   */
+  private boolean isMinMaxEnabled;
+
+  private QueryStatisticsModel queryStatisticsModel;
+
+  private boolean useBitSetPipeLine;
+
+  public BlockletFilterScanner(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo, queryStatisticsModel);
+    // to check whether min max is enabled or not
+    String minMaxEnableValue = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
+            CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
+    if (null != minMaxEnableValue) {
+      isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
+    }
+    // get the filter tree
+    this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
+    this.queryStatisticsModel = queryStatisticsModel;
+
+    String useBitSetPipeLine = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.BITSET_PIPE_LINE,
+            CarbonCommonConstants.BITSET_PIPE_LINE_DEFAULT);
+    if (null != useBitSetPipeLine) {
+      this.useBitSetPipeLine = Boolean.parseBoolean(useBitSetPipeLine);
+    }
+  }
+
+  /**
+   * Below method will be used to process the block
+   *
+   * @param rawBlockletColumnChunks block chunk holder which holds the data
+   * @throws FilterUnsupportedException
+   */
+  @Override
+  public BlockletScannedResult scanBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks)
+      throws IOException, FilterUnsupportedException {
+    return executeFilter(rawBlockletColumnChunks);
+  }
+
+  @Override
+  public boolean isScanRequired(DataRefNode dataBlock) {
+    // adding statistics for number of pages
+    QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
+    totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
+        totalPagesScanned.getCount() + dataBlock.numberOfPages());
+    // apply min max
+    if (isMinMaxEnabled) {
+      BitSet bitSet = null;
+      // check for implicit include filter instance
+      if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
+        String blockletId = blockExecutionInfo.getBlockIdString() +
+            CarbonCommonConstants.FILE_SEPARATOR + dataBlock.blockletIndex();
+        bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
+            .isFilterValuesPresentInBlockOrBlocklet(
+                dataBlock.getColumnsMaxValue(),
+                dataBlock.getColumnsMinValue(), blockletId);
+      } else {
+        bitSet = this.filterExecuter
+            .isScanRequired(dataBlock.getColumnsMaxValue(),
+                dataBlock.getColumnsMinValue());
+      }
+      return !bitSet.isEmpty();
+    }
+    return true;
+  }
+
+  @Override
+  public void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException {
+    long startTime = System.currentTimeMillis();
+    this.filterExecuter.readColumnChunks(rawBlockletColumnChunks);
+    // adding statistics for carbon read time
+    QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
+    readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
+        readTime.getCount() + (System.currentTimeMillis() - startTime));
+  }
+
+  /**
+   * This method will process the data in below order
+   * 1. first apply min max on the filter tree and check whether any of the filter
+   * is fall on the range of min max, if not then return empty result
+   * 2. If filter falls on min max range then apply filter on actual
+   * data and get the filtered row index
+   * 3. if row index is empty then return the empty result
+   * 4. if row indexes is not empty then read only those blocks(measure or dimension)
+   * which was present in the query but not present in the filter, as while applying filter
+   * some of the blocks where already read and present in chunk holder so not need to
+   * read those blocks again, this is to avoid reading of same blocks which was already read
+   * 5. Set the blocks and filter indexes to result
+   *
+   * @param rawBlockletColumnChunks
+   * @throws FilterUnsupportedException
+   */
+  private BlockletScannedResult executeFilter(RawBlockletColumnChunks rawBlockletColumnChunks)
+      throws FilterUnsupportedException, IOException {
+    long startTime = System.currentTimeMillis();
+    QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+    totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+        totalBlockletStatistic.getCount() + 1);
+    // apply filter on actual data, for each page
+    BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks,
+        useBitSetPipeLine);
+    // if filter result is empty then return with empty result
+    if (bitSetGroup.isEmpty()) {
+      CarbonUtil.freeMemory(rawBlockletColumnChunks.getDimensionRawColumnChunks(),
+          rawBlockletColumnChunks.getMeasureRawColumnChunks());
+
+      QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+          .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+      scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+          scanTime.getCount() + (System.currentTimeMillis() - startTime));
+
+      QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+          .get(QueryStatisticsConstants.PAGE_SCANNED);
+      scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
+          scannedPages.getCount() + bitSetGroup.getScannedPages());
+      return createEmptyResult();
+    }
+
+    BlockletScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+    scannedResult.setBlockletId(
+        blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
+            rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    // valid scanned blocklet
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    validScannedBlockletStatistic
+        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+            validScannedBlockletStatistic.getCount() + 1);
+    // adding statistics for valid number of pages
+    QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
+    validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
+        validPages.getCount() + bitSetGroup.getValidPages());
+    QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.PAGE_SCANNED);
+    scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
+        scannedPages.getCount() + bitSetGroup.getScannedPages());
+    int[] pageFilteredRowCount = new int[bitSetGroup.getNumberOfPages()];
+    // get the row indexes from bit set for each page
+    int[][] pageFilteredRowId = new int[bitSetGroup.getNumberOfPages()][];
+    int numPages = pageFilteredRowId.length;
+    for (int pageId = 0; pageId < numPages; pageId++) {
+      BitSet bitSet = bitSetGroup.getBitSet(pageId);
+      if (bitSet != null && !bitSet.isEmpty()) {
+        int[] matchedRowId = new int[bitSet.cardinality()];
+        int index = 0;
+        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+          matchedRowId[index++] = i;
+        }
+        pageFilteredRowCount[pageId] = matchedRowId.length;
+        pageFilteredRowId[pageId] = matchedRowId;
+      }
+    }
+
+    long dimensionReadTime = System.currentTimeMillis();
+    dimensionReadTime = System.currentTimeMillis() - dimensionReadTime;
+
+    FileReader fileReader = rawBlockletColumnChunks.getFileReader();
+
+
+    DimensionRawColumnChunk[] dimensionRawColumnChunks =
+        new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionToRead()];
+    int numDimensionChunks = dimensionRawColumnChunks.length;
+    // read dimension chunk blocks from file which is not present
+    for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) {
+      dimensionRawColumnChunks[chunkIndex] =
+          rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex];
+    }
+    int[][] allSelectedDimensionColumnIndexRange =
+        blockExecutionInfo.getAllSelectedDimensionColumnIndexRange();
+    DimensionRawColumnChunk[] projectionListDimensionChunk = rawBlockletColumnChunks.getDataBlock()
+        .readDimensionChunks(fileReader, allSelectedDimensionColumnIndexRange);
+    for (int[] columnIndexRange : allSelectedDimensionColumnIndexRange) {
+      System.arraycopy(projectionListDimensionChunk, columnIndexRange[0],
+          dimensionRawColumnChunks, columnIndexRange[0],
+          columnIndexRange[1] + 1 - columnIndexRange[0]);
+    }
+
+    /*
+     * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
+     * then loading them
+     */
+    int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
+    for (int projectionListDimensionIndex : projectionListDimensionIndexes) {
+      if (null == dimensionRawColumnChunks[projectionListDimensionIndex]) {
+        dimensionRawColumnChunks[projectionListDimensionIndex] =
+            rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
+                fileReader, projectionListDimensionIndex);
+      }
+    }
+
+    DimensionColumnPage[][] dimensionColumnPages =
+        new DimensionColumnPage[numDimensionChunks][numPages];
+    for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) {
+      if (dimensionRawColumnChunks[chunkIndex] != null) {
+        for (int pageId = 0; pageId < numPages; pageId++) {
+          dimensionColumnPages[chunkIndex][pageId] =
+              dimensionRawColumnChunks[chunkIndex].decodeColumnPage(pageId);
+        }
+      }
+    }
+
+
+    MeasureRawColumnChunk[] measureRawColumnChunks =
+        new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureToRead()];
+    int numMeasureChunks = measureRawColumnChunks.length;
+
+    // read the measure chunk blocks which is not present
+    for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) {
+      if (null != rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
+        measureRawColumnChunks[chunkIndex] =
+            rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex];
+      }
+    }
+
+    int[][] allSelectedMeasureColumnIndexRange =
+        blockExecutionInfo.getAllSelectedMeasureIndexRange();
+    MeasureRawColumnChunk[] projectionListMeasureChunk = rawBlockletColumnChunks.getDataBlock()
+        .readMeasureChunks(fileReader, allSelectedMeasureColumnIndexRange);
+    for (int[] columnIndexRange : allSelectedMeasureColumnIndexRange) {
+      System.arraycopy(projectionListMeasureChunk, columnIndexRange[0], measureRawColumnChunks,
+          columnIndexRange[0], columnIndexRange[1] + 1 - columnIndexRange[0]);
+    }
+    /*
+     * in case projection if the projected measure are not loaded in the ColumnPage
+     * then loading them
+     */
+    int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
+    for (int projectionListMeasureIndex : projectionListMeasureIndexes) {
+      if (null == measureRawColumnChunks[projectionListMeasureIndex]) {
+        measureRawColumnChunks[projectionListMeasureIndex] = rawBlockletColumnChunks.getDataBlock()
+            .readMeasureChunk(fileReader, projectionListMeasureIndex);
+      }
+    }
+    ColumnPage[][] measureColumnPages = new ColumnPage[numMeasureChunks][numPages];
+    for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) {
+      if (measureRawColumnChunks[chunkIndex] != null) {
+        for (int pageId = 0; pageId < numPages; pageId++) {
+          measureColumnPages[chunkIndex][pageId] =
+              measureRawColumnChunks[chunkIndex].decodeColumnPage(pageId);
+        }
+      }
+    }
+
+    scannedResult.setDimensionColumnPages(dimensionColumnPages);
+    scannedResult.setPageFilteredRowId(pageFilteredRowId);
+    scannedResult.setMeasureColumnPages(measureColumnPages);
+    scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
+    scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
+    scannedResult.setPageFilteredRowCount(pageFilteredRowCount);
+    // adding statistics for carbon scan time
+    QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+    scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+        scanTime.getCount() + (System.currentTimeMillis() - startTime - dimensionReadTime));
+    QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
+    readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
+        readTime.getCount() + dimensionReadTime);
+    return scannedResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
new file mode 100644
index 0000000..f0211dc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+
+/**
+ * Blocklet scanner to do full scan of a blocklet,
+ * returning all projection and filter column chunks
+ */
+public class BlockletFullScanner implements BlockletScanner {
+
+  /**
+   * block execution info
+   */
+  protected BlockExecutionInfo blockExecutionInfo;
+
+  private QueryStatisticsModel queryStatisticsModel;
+
+  private BlockletScannedResult emptyResult;
+
+  public BlockletFullScanner(BlockExecutionInfo tableBlockExecutionInfos,
+      QueryStatisticsModel queryStatisticsModel) {
+    this.blockExecutionInfo = tableBlockExecutionInfos;
+    this.queryStatisticsModel = queryStatisticsModel;
+  }
+
+  @Override
+  public BlockletScannedResult scanBlocklet(
+      RawBlockletColumnChunks rawBlockletColumnChunks)
+      throws IOException, FilterUnsupportedException {
+    long startTime = System.currentTimeMillis();
+    BlockletScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+    QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+    totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+        totalBlockletStatistic.getCount() + 1);
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    validScannedBlockletStatistic
+        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+            validScannedBlockletStatistic.getCount() + 1);
+    // adding statistics for valid number of pages
+    QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
+    validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
+        validPages.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
+    // adding statistics for number of pages
+    QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
+    totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
+        totalPagesScanned.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
+    scannedResult.setBlockletId(
+        blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
+            rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    if (!blockExecutionInfo.isPrefetchBlocklet()) {
+      readBlocklet(rawBlockletColumnChunks);
+    }
+    DimensionRawColumnChunk[] dimensionRawColumnChunks =
+        rawBlockletColumnChunks.getDimensionRawColumnChunks();
+    DimensionColumnPage[][] dimensionColumnDataChunks =
+        new DimensionColumnPage[dimensionRawColumnChunks.length][rawBlockletColumnChunks
+            .getDataBlock().numberOfPages()];
+    MeasureRawColumnChunk[] measureRawColumnChunks =
+        rawBlockletColumnChunks.getMeasureRawColumnChunks();
+    ColumnPage[][] measureColumnPages =
+        new ColumnPage[measureRawColumnChunks.length][rawBlockletColumnChunks.getDataBlock()
+                       .numberOfPages()];
+    scannedResult.setDimensionColumnPages(dimensionColumnDataChunks);
+    scannedResult.setMeasureColumnPages(measureColumnPages);
+    scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
+    scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
+    if (blockExecutionInfo.isPrefetchBlocklet()) {
+      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+        if (dimensionRawColumnChunks[i] != null) {
+          dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].decodeAllColumnPages();
+        }
+      }
+      for (int i = 0; i < measureRawColumnChunks.length; i++) {
+        if (measureRawColumnChunks[i] != null) {
+          measureColumnPages[i] = measureRawColumnChunks[i].decodeAllColumnPages();
+        }
+      }
+    }
+    int[] numberOfRows = null;
+    if (blockExecutionInfo.getAllSelectedDimensionColumnIndexRange().length > 0) {
+      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+        if (dimensionRawColumnChunks[i] != null) {
+          numberOfRows = dimensionRawColumnChunks[i].getRowCount();
+          break;
+        }
+      }
+    } else if (blockExecutionInfo.getAllSelectedMeasureIndexRange().length > 0) {
+      for (int i = 0; i < measureRawColumnChunks.length; i++) {
+        if (measureRawColumnChunks[i] != null) {
+          numberOfRows = measureRawColumnChunks[i].getRowCount();
+          break;
+        }
+      }
+    }
+
+    // count(*)  case there would not be any dimensions are measures selected.
+    if (numberOfRows == null) {
+      numberOfRows = new int[rawBlockletColumnChunks.getDataBlock().numberOfPages()];
+      for (int i = 0; i < numberOfRows.length; i++) {
+        numberOfRows[i] =
+            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+      }
+      int lastPageSize = rawBlockletColumnChunks.getDataBlock().numRows()
+          % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+      ;
+      if (lastPageSize > 0) {
+        numberOfRows[numberOfRows.length - 1] = lastPageSize;
+      }
+    }
+    scannedResult.setPageFilteredRowCount(numberOfRows);
+    if (!blockExecutionInfo.isPrefetchBlocklet()) {
+      scannedResult.fillDataChunks();
+    }
+    // adding statistics for carbon scan time
+    QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+    scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+        scanTime.getCount() + (System.currentTimeMillis() - startTime));
+    return scannedResult;
+  }
+
+  @Override
+  public void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks)
+      throws IOException {
+    long startTime = System.currentTimeMillis();
+    DimensionRawColumnChunk[] dimensionRawColumnChunks = rawBlockletColumnChunks.getDataBlock()
+        .readDimensionChunks(rawBlockletColumnChunks.getFileReader(),
+            blockExecutionInfo.getAllSelectedDimensionColumnIndexRange());
+    rawBlockletColumnChunks.setDimensionRawColumnChunks(dimensionRawColumnChunks);
+    MeasureRawColumnChunk[] measureRawColumnChunks = rawBlockletColumnChunks.getDataBlock()
+        .readMeasureChunks(rawBlockletColumnChunks.getFileReader(),
+            blockExecutionInfo.getAllSelectedMeasureIndexRange());
+    rawBlockletColumnChunks.setMeasureRawColumnChunks(measureRawColumnChunks);
+    // adding statistics for carbon read time
+    QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
+    readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
+        readTime.getCount() + (System.currentTimeMillis() - startTime));
+  }
+
+  BlockletScannedResult createEmptyResult() {
+    if (emptyResult == null) {
+      emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+      emptyResult.setPageFilteredRowCount(new int[0]);
+      emptyResult.setPageFilteredRowId(new int[0][]);
+    }
+    return emptyResult;
+  }
+
+  @Override public boolean isScanRequired(DataRefNode dataBlock) {
+    // For non filter it is always true
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
deleted file mode 100644
index e77093b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.scan.scanner.impl;
-
-import java.io.IOException;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult;
-import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-import org.apache.carbondata.core.util.BitSetGroup;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Below class will be used for filter query processing
- * this class will be first apply the filter then it will read the block if
- * required and return the scanned result
- */
-public class FilterScanner extends AbstractBlockletScanner {
-
-  /**
-   * filter tree
-   */
-  private FilterExecuter filterExecuter;
-  /**
-   * this will be used to apply min max
-   * this will be useful for dimension column which is on the right side
-   * as node finder will always give tentative blocks, if column data stored individually
-   * and data is in sorted order then we can check whether filter is in the range of min max or not
-   * if it present then only we can apply filter on complete data.
-   * this will be very useful in case of sparse data when rows are
-   * repeating.
-   */
-  private boolean isMinMaxEnabled;
-
-  private QueryStatisticsModel queryStatisticsModel;
-
-  private boolean useBitSetPipeLine;
-
-  public FilterScanner(BlockExecutionInfo blockExecutionInfo,
-      QueryStatisticsModel queryStatisticsModel) {
-    super(blockExecutionInfo);
-    // to check whether min max is enabled or not
-    String minMaxEnableValue = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
-            CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
-    if (null != minMaxEnableValue) {
-      isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
-    }
-    // get the filter tree
-    this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
-    this.queryStatisticsModel = queryStatisticsModel;
-
-    String useBitSetPipeLine = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.BITSET_PIPE_LINE,
-            CarbonCommonConstants.BITSET_PIPE_LINE_DEFAULT);
-    if (null != useBitSetPipeLine) {
-      this.useBitSetPipeLine = Boolean.parseBoolean(useBitSetPipeLine);
-    }
-  }
-
-  /**
-   * Below method will be used to process the block
-   *
-   * @param blocksChunkHolder block chunk holder which holds the data
-   * @throws FilterUnsupportedException
-   */
-  @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
-      throws IOException, FilterUnsupportedException {
-    return fillScannedResult(blocksChunkHolder);
-  }
-
-  @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
-    // adding statistics for number of pages
-    QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
-    totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
-        totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
-    // apply min max
-    if (isMinMaxEnabled) {
-      BitSet bitSet = null;
-      // check for implicit include filter instance
-      if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
-        String blockletId = blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR
-            + blocksChunkHolder.getDataBlock().blockletId();
-        bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
-            .isFilterValuesPresentInBlockOrBlocklet(
-                blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
-                blocksChunkHolder.getDataBlock().getColumnsMinValue(), blockletId);
-      } else {
-        bitSet = this.filterExecuter
-            .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
-                blocksChunkHolder.getDataBlock().getColumnsMinValue());
-      }
-      if (bitSet.isEmpty()) {
-        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
-            blocksChunkHolder.getMeasureRawDataChunk());
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
-    long startTime = System.currentTimeMillis();
-    this.filterExecuter.readBlocks(blocksChunkHolder);
-    // adding statistics for carbon read time
-    QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
-    readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
-        readTime.getCount() + (System.currentTimeMillis() - startTime));
-  }
-
-  /**
-   * This method will process the data in below order
-   * 1. first apply min max on the filter tree and check whether any of the filter
-   * is fall on the range of min max, if not then return empty result
-   * 2. If filter falls on min max range then apply filter on actual
-   * data and get the filtered row index
-   * 3. if row index is empty then return the empty result
-   * 4. if row indexes is not empty then read only those blocks(measure or dimension)
-   * which was present in the query but not present in the filter, as while applying filter
-   * some of the blocks where already read and present in chunk holder so not need to
-   * read those blocks again, this is to avoid reading of same blocks which was already read
-   * 5. Set the blocks and filter indexes to result
-   *
-   * @param blocksChunkHolder
-   * @throws FilterUnsupportedException
-   */
-  private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHolder)
-      throws FilterUnsupportedException, IOException {
-    long startTime = System.currentTimeMillis();
-    QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
-    totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
-        totalBlockletStatistic.getCount() + 1);
-    // apply filter on actual data
-    BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder, useBitSetPipeLine);
-    // if indexes is empty then return with empty result
-    if (bitSetGroup.isEmpty()) {
-      CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
-          blocksChunkHolder.getMeasureRawDataChunk());
-
-      QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
-      scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
-          scanTime.getCount() + (System.currentTimeMillis() - startTime));
-
-      QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.PAGE_SCANNED);
-      scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
-          scannedPages.getCount() + bitSetGroup.getScannedPages());
-      return createEmptyResult();
-    }
-
-    AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
-            .getDataBlock().blockletId());
-    // valid scanned blocklet
-    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
-    validScannedBlockletStatistic
-        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
-            validScannedBlockletStatistic.getCount() + 1);
-    // adding statistics for valid number of pages
-    QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
-    validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
-        validPages.getCount() + bitSetGroup.getValidPages());
-    QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.PAGE_SCANNED);
-    scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
-        scannedPages.getCount() + bitSetGroup.getScannedPages());
-    int[] rowCount = new int[bitSetGroup.getNumberOfPages()];
-    // get the row indexes from bot set
-    int[][] indexesGroup = new int[bitSetGroup.getNumberOfPages()][];
-    for (int k = 0; k < indexesGroup.length; k++) {
-      BitSet bitSet = bitSetGroup.getBitSet(k);
-      if (bitSet != null && !bitSet.isEmpty()) {
-        int[] indexes = new int[bitSet.cardinality()];
-        int index = 0;
-        for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
-          indexes[index++] = i;
-        }
-        rowCount[k] = indexes.length;
-        indexesGroup[k] = indexes;
-      }
-    }
-    FileHolder fileReader = blocksChunkHolder.getFileReader();
-    int[][] allSelectedDimensionBlocksIndexes =
-        blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
-    long dimensionReadTime = System.currentTimeMillis();
-    DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
-        .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
-    dimensionReadTime = System.currentTimeMillis() - dimensionReadTime;
-
-    DimensionRawColumnChunk[] dimensionRawColumnChunks =
-        new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
-    // read dimension chunk blocks from file which is not present
-    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
-      if (null != blocksChunkHolder.getDimensionRawDataChunk()[i]) {
-        dimensionRawColumnChunks[i] = blocksChunkHolder.getDimensionRawDataChunk()[i];
-      }
-    }
-    for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
-      for (int j = allSelectedDimensionBlocksIndexes[i][0];
-           j <= allSelectedDimensionBlocksIndexes[i][1]; j++) {
-        dimensionRawColumnChunks[j] = projectionListDimensionChunk[j];
-      }
-    }
-    long dimensionReadTime1 = System.currentTimeMillis();
-    /**
-     * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
-     * then loading them
-     */
-    int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
-    int projectionListDimensionIndexesLength = projectionListDimensionIndexes.length;
-    for (int i = 0; i < projectionListDimensionIndexesLength; i++) {
-      if (null == dimensionRawColumnChunks[projectionListDimensionIndexes[i]]) {
-        dimensionRawColumnChunks[projectionListDimensionIndexes[i]] =
-            blocksChunkHolder.getDataBlock()
-                .getDimensionChunk(fileReader, projectionListDimensionIndexes[i]);
-      }
-    }
-    dimensionReadTime += (System.currentTimeMillis() - dimensionReadTime1);
-    dimensionReadTime1 = System.currentTimeMillis();
-    MeasureRawColumnChunk[] measureRawColumnChunks =
-        new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
-    int[][] allSelectedMeasureBlocksIndexes =
-        blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
-    MeasureRawColumnChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
-        .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
-    dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1;
-    // read the measure chunk blocks which is not present
-    for (int i = 0; i < measureRawColumnChunks.length; i++) {
-      if (null != blocksChunkHolder.getMeasureRawDataChunk()[i]) {
-        measureRawColumnChunks[i] = blocksChunkHolder.getMeasureRawDataChunk()[i];
-      }
-    }
-    for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
-      for (int j = allSelectedMeasureBlocksIndexes[i][0];
-           j <= allSelectedMeasureBlocksIndexes[i][1]; j++) {
-        measureRawColumnChunks[j] = projectionListMeasureChunk[j];
-      }
-    }
-    dimensionReadTime1 = System.currentTimeMillis();
-    /**
-     * in case projection if the projected measure are not loaded in the ColumnPage
-     * then loading them
-     */
-    int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
-    int projectionListMeasureIndexesLength = projectionListMeasureIndexes.length;
-    for (int i = 0; i < projectionListMeasureIndexesLength; i++) {
-      if (null == measureRawColumnChunks[projectionListMeasureIndexes[i]]) {
-        measureRawColumnChunks[projectionListMeasureIndexes[i]] = blocksChunkHolder.getDataBlock()
-            .getMeasureChunk(fileReader, projectionListMeasureIndexes[i]);
-      }
-    }
-    dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1;
-    DimensionColumnDataChunk[][] dimensionColumnDataChunks =
-        new DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length];
-    ColumnPage[][] columnPages =
-        new ColumnPage[measureRawColumnChunks.length][indexesGroup.length];
-    for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
-      if (dimensionRawColumnChunks[i] != null) {
-        for (int j = 0; j < indexesGroup.length; j++) {
-          dimensionColumnDataChunks[i][j] = dimensionRawColumnChunks[i].convertToDimColDataChunk(j);
-        }
-      }
-    }
-    for (int i = 0; i < measureRawColumnChunks.length; i++) {
-      if (measureRawColumnChunks[i] != null) {
-        for (int j = 0; j < indexesGroup.length; j++) {
-          columnPages[i][j] = measureRawColumnChunks[i].convertToColumnPage(j);
-        }
-      }
-    }
-    scannedResult.setDimensionChunks(dimensionColumnDataChunks);
-    scannedResult.setIndexes(indexesGroup);
-    scannedResult.setMeasureChunks(columnPages);
-    scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
-    scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
-    scannedResult.setNumberOfRows(rowCount);
-    // adding statistics for carbon scan time
-    QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
-    scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
-        scanTime.getCount() + (System.currentTimeMillis() - startTime - dimensionReadTime));
-    QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-        .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
-    readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
-        readTime.getCount() + dimensionReadTime);
-    return scannedResult;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
deleted file mode 100644
index 1373ed5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.scanner.impl;
-
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-
-/**
- * Non filter processor which will be used for non filter query
- * In case of non filter query we just need to read all the blocks requested in the
- * query and pass it to scanned result
- */
-public class NonFilterScanner extends AbstractBlockletScanner {
-
-  public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
-                          QueryStatisticsModel queryStatisticsModel) {
-    super(blockExecutionInfo);
-    super.queryStatisticsModel = queryStatisticsModel;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 2f981b5..6faae03 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -30,22 +30,17 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
    * to store key which is generated using
    * key generator
    */
-  protected byte[] dictionaryKey;
+  private byte[] dictionaryKey;
 
   /**
    * to store no dictionary column data
    */
-  protected byte[][] complexTypesKeys;
+  private byte[][] complexTypesKeys;
 
   /**
    * to store no dictionary column data
    */
-  protected byte[][] noDictionaryKeys;
-
-  /**
-   * contains value of implicit columns in byte array format
-   */
-  protected byte[] implicitColumnByteArray;
+  private byte[][] noDictionaryKeys;
 
   public ByteArrayWrapper() {
   }
@@ -91,16 +86,6 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
   }
 
   /**
-   * to get the no dictionary column data
-   *
-   * @param index of the no dictionary key
-   * @return no dictionary key for the index
-   */
-  public byte[] getComplexTypeByIndex(int index) {
-    return this.complexTypesKeys[index];
-  }
-
-  /**
    * to generate the hash code
    */
   @Override public int hashCode() {
@@ -201,30 +186,10 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
   }
 
   /**
-   * @return the complexTypesKeys
-   */
-  public byte[][] getComplexTypesKeys() {
-    return complexTypesKeys;
-  }
-
-  /**
    * @param complexTypesKeys the complexTypesKeys to set
    */
   public void setComplexTypesKeys(byte[][] complexTypesKeys) {
     this.complexTypesKeys = complexTypesKeys;
   }
 
-  /**
-   * @return
-   */
-  public byte[] getImplicitColumnByteArray() {
-    return implicitColumnByteArray;
-  }
-
-  /**
-   * @param implicitColumnByteArray
-   */
-  public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
-    this.implicitColumnByteArray = implicitColumnByteArray;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
deleted file mode 100644
index 8a37d01..0000000
--- a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.stats;
-
-import java.io.Serializable;
-
-public class PartitionStatistic implements Serializable {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
index 55f0882..ed60d37 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
@@ -37,7 +37,4 @@ public class QueryStatisticsModel {
     return statisticsTypeAndObjMap;
   }
 
-  public void setStatisticsTypeAndObjMap(Map<String, QueryStatistic> statisticsTypeAndObjMap) {
-    this.statisticsTypeAndObjMap = statisticsTypeAndObjMap;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 73a665d..d0c8e93 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -341,15 +341,6 @@ public class LoadMetadataDetails implements Serializable {
   }
 
   /**
-   * To get isDeleted property.
-   *
-   * @return isDeleted
-   */
-  public String getIsDeleted() {
-    return isDeleted;
-  }
-
-  /**
    * To set isDeleted property.
    *
    * @param isDeleted

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 9d14c62..fa3ab0c 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -671,22 +671,6 @@ public class SegmentStatusManager {
     return "";
   }
 
-  /**
-   * getting the task numbers present in the segment.
-   * @param segmentId
-   * @return
-   */
-  public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
-          updateStatusManager) {
-    List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
-    for (String eachFileName : list) {
-      taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
-    }
-    return taskList;
-  }
-
-
   public static class ValidAndInvalidSegmentsInfo {
     private final List<String> listOfValidSegments;
     private final List<String> listOfValidUpdatedSegments;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index d4ef5c6..66f7a12 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.TupleIdEnum;
 import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -140,14 +139,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   *
-   * @param loadMetadataDetails
-   */
-  public void setLoadMetadataDetails(LoadMetadataDetails[] loadMetadataDetails) {
-    this.segmentDetails = loadMetadataDetails;
-  }
-
-  /**
    * Returns the UpdateStatus Details.
    * @return
    */
@@ -174,18 +165,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * Returns all delete delta files of specified block
-   *
-   * @param tupleId
-   * @return
-   * @throws Exception
-   */
-  public List<String> getDeleteDeltaFiles(String tupleId) throws Exception {
-    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
-  }
-
-
-  /**
    * Returns all update delta files of specified Segment.
    *
    * @param segmentId
@@ -248,20 +227,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * Returns all deleted records of specified block
-   *
-   * @param tupleId
-   * @return
-   * @throws Exception
-   */
-  public Map<Integer, Integer[]> getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception {
-    List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
-    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
-    String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID);
-    return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId);
-  }
-
-  /**
    * Below method will be used to get all the delete delta files based on block name
    *
    * @param blockFilePath actual block filePath
@@ -775,41 +740,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * compares passed time stamp with status file delete timestamp and
-   * returns latest timestamp from status file if both are not equal
-   * returns null otherwise
-   *
-   * @param completeBlockName
-   * @param timestamp
-   * @return
-   */
-  public String getTimestampForRefreshCache(String completeBlockName, String timestamp) {
-    long cacheTimestamp = 0;
-    if (null != timestamp) {
-      cacheTimestamp = CarbonUpdateUtil.getTimeStampAsLong(timestamp);
-    }
-    String blockName = CarbonTablePath.addDataPartPrefix(CarbonUpdateUtil.getBlockName(
-        CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.BLOCK_ID)));
-    String segmentId =
-        CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.SEGMENT_ID);
-    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
-        readLoadMetadata();
-    for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
-      if (segmentId.equalsIgnoreCase(block.getSegmentName()) &&
-          block.getBlockName().equalsIgnoreCase(blockName) &&
-          !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-        long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong();
-        if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) {
-          return null;
-        } else {
-          return block.getDeleteDeltaEndTimestamp();
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
    * This method closes the streams
    *
    * @param streams - streams to close.
@@ -828,85 +758,7 @@ public class SegmentUpdateStatusManager {
       }
     }
   }
-  /**
-   * Get the invalid tasks in that segment.
-   * @param segmentId
-   * @return
-   */
-  public List<String> getInvalidBlockList(String segmentId) {
-
-    // get the original fact file timestamp from the table status file.
-    List<String> listOfInvalidBlocks = new ArrayList<String>();
-    SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    LoadMetadataDetails[] segmentDetails =
-        ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
-    long timestampOfOriginalFacts = 0;
-
-    String startTimestampOfUpdate = "" ;
-    String endTimestampOfUpdate = "";
-
-    for (LoadMetadataDetails segment : segmentDetails) {
-      // find matching segment and return timestamp.
-      if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
-        timestampOfOriginalFacts = segment.getLoadStartTime();
-        startTimestampOfUpdate = segment.getUpdateDeltaStartTimestamp();
-        endTimestampOfUpdate = segment.getUpdateDeltaEndTimestamp();
-      }
-    }
-
-    if (startTimestampOfUpdate.isEmpty()) {
-      return listOfInvalidBlocks;
-
-    }
-
-    // now after getting the original fact timestamp, what ever is remaining
-    // files need to cross check it with table status file.
-
-    // filter out the fact files.
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-
-    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimestampOfUpdate);
-    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimestampOfUpdate);
-    final Long timeStampOriginalFactFinal =
-        timestampOfOriginalFacts;
-
-    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
-
-      @Override public boolean accept(CarbonFile pathName) {
-        String fileName = pathName.getName();
-        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
-          String firstPart = fileName.substring(0, fileName.indexOf('.'));
-
-          long timestamp = Long.parseLong(firstPart
-              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
-                  firstPart.length()));
-          if (Long.compare(timestamp, endTimeStampFinal) <= 0
-              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
-            return false;
-          }
-          if (Long.compare(timestamp, timeStampOriginalFactFinal) == 0) {
-            return false;
-          }
-          // take the rest of files as they are invalid.
-          return true;
-        }
-        return false;
-      }
-    });
 
-    // gather the task numbers.
-    for (CarbonFile updateFiles : files) {
-      listOfInvalidBlocks.add(updateFiles.getName());
-    }
-
-    return listOfInvalidBlocks;
-  }
   /**
    * Returns the invalid timestamp range of a segment.
    * @param segmentId
@@ -932,12 +784,11 @@ public class SegmentUpdateStatusManager {
   }
   /**
    *
-   * @param segmentId
    * @param block
    * @param needCompleteList
    * @return
    */
-  public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId,
+  public CarbonFile[] getDeleteDeltaInvalidFilesList(
       final SegmentUpdateDetails block, final boolean needCompleteList,
       CarbonFile[] allSegmentFiles, boolean isAbortedFile) {
 
@@ -983,12 +834,11 @@ public class SegmentUpdateStatusManager {
 
   /**
    *
-   * @param blockName
    * @param allSegmentFiles
    * @return
    */
-  public CarbonFile[] getAllBlockRelatedFiles(String blockName, CarbonFile[] allSegmentFiles,
-                                              String actualBlockName) {
+  public CarbonFile[] getAllBlockRelatedFiles(CarbonFile[] allSegmentFiles,
+      String actualBlockName) {
     List<CarbonFile> files = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     for (CarbonFile eachFile : allSegmentFiles) {