You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/26 13:22:52 UTC

carbondata git commit: [CARBONDATA-3014] Added support for inverted index and delete delta for direct scan queries

Repository: carbondata
Updated Branches:
  refs/heads/master b62b0fd9c -> 71d617955


[CARBONDATA-3014] Added support for inverted index and delete delta for direct scan queries

Added new classes to support inverted index and delete delta directly from column vector.
ColumnarVectorWrapperDirectWithInvertedIndex
ColumnarVectorWrapperDirectWithDeleteDelta
ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex

This closes #2822


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71d61795
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71d61795
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71d61795

Branch: refs/heads/master
Commit: 71d6179557703718ff0aac099efcc89ee41ed941
Parents: b62b0fd
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 16:37:18 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri Oct 26 18:52:10 2018 +0530

----------------------------------------------------------------------
 ...mpressedDimensionChunkFileBasedReaderV3.java |  12 +-
 .../safe/AbstractNonDictionaryVectorFiller.java |   6 +-
 .../SafeFixedLengthDimensionDataChunkStore.java |  11 +
 ...feVariableLengthDimensionDataChunkStore.java |  10 +
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |   3 +
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |  35 ++-
 .../adaptive/AdaptiveFloatingCodec.java         |   3 +
 .../adaptive/AdaptiveIntegralCodec.java         |  17 +-
 .../encoding/compress/DirectCompressCodec.java  |  16 +-
 .../datatype/DecimalConverterFactory.java       |  42 +++-
 .../scan/collector/ResultCollectorFactory.java  |  11 +-
 .../executer/RestructureEvaluatorImpl.java      |   2 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  14 +-
 .../scan/result/vector/ColumnVectorInfo.java    |   1 +
 .../AbstractCarbonColumnarVector.java           | 133 ++++++++++++
 .../ColumnarVectorWrapperDirectFactory.java     |  59 +++++
 ...umnarVectorWrapperDirectWithDeleteDelta.java | 216 +++++++++++++++++++
 ...erDirectWithDeleteDeltaAndInvertedIndex.java | 179 +++++++++++++++
 ...narVectorWrapperDirectWithInvertedIndex.java | 144 +++++++++++++
 .../impl/directread/ConvertableVector.java      |  30 +++
 .../scanner/impl/BlockletFilterScanner.java     |   8 +-
 .../detailquery/CastColumnTestCase.scala        |   2 +-
 .../datasources/SparkCarbonFileFormat.scala     |   1 +
 23 files changed, 910 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index a9f9338..602e694 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -276,13 +276,19 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
         offset += pageMetadata.data_page_length;
         invertedIndexes = CarbonUtil
             .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
-        // get the reverse index
-        invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+        if (vectorInfo == null) {
+          // get the reverse index
+          invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+        } else {
+          vectorInfo.invertedIndex = invertedIndexes;
+        }
       }
       BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
       ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
           null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet);
-      decodedPage.setNullBits(nullBitSet);
+      if (decodedPage != null) {
+        decodedPage.setNullBits(nullBitSet);
+      }
       return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
           invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
index ddfa470..2e68648 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -52,7 +52,11 @@ class NonDictionaryVectorFillerFactory {
   public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
       int numberOfRows) {
     if (type == DataTypes.STRING) {
-      return new StringVectorFiller(lengthSize, numberOfRows);
+      if (lengthSize > 2) {
+        return new LongStringVectorFiller(lengthSize, numberOfRows);
+      } else {
+        return new StringVectorFiller(lengthSize, numberOfRows);
+      }
     } else if (type == DataTypes.VARCHAR) {
       return new LongStringVectorFiller(lengthSize, numberOfRows);
     } else if (type == DataTypes.TIMESTAMP) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index d30650d..d4bae90 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -17,12 +17,16 @@
 
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
+import java.util.BitSet;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -49,7 +53,14 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
   public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
       ColumnVectorInfo vectorInfo) {
     CarbonColumnVector vector = vectorInfo.vector;
+    BitSet deletedRows = vectorInfo.deletedRows;
+    BitSet nullBits = new BitSet(numOfRows);
+    vector = ColumnarVectorWrapperDirectFactory
+        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false);
     fillVector(data, vectorInfo, vector);
+    if (vector instanceof ConvertableVector) {
+      ((ConvertableVector) vector).convert();
+    }
   }
 
   private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 0fb4854..b80ad7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -18,12 +18,15 @@
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -103,7 +106,14 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
     ByteBuffer buffer = ByteBuffer.wrap(data);
     AbstractNonDictionaryVectorFiller vectorFiller =
         NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
+    BitSet nullBits = new BitSet(numberOfRows);
+    vector = ColumnarVectorWrapperDirectFactory
+        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, vectorInfo.deletedRows,
+            false);
     vectorFiller.fillVector(data, vector, buffer);
+    if (vector instanceof ConvertableVector) {
+      ((ConvertableVector) vector).convert();
+    }
   }
 
   protected abstract int getLengthSize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 1826798..d19d1c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -250,6 +251,8 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true);
       if (vectorDataType == DataTypes.FLOAT) {
         float floatFactor = factor.floatValue();
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 0d7ad8a..1671246 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -305,16 +307,26 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       DataType pageDataType = columnPage.getDataType();
       int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
+              true);
       fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }
       }
+      if (vector instanceof ConvertableVector) {
+        ((ConvertableVector) vector).convert();
+      }
     }
 
     private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
         DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      int newScale = 0;
+      if (vectorInfo.measure != null) {
+        newScale = vectorInfo.measure.getMeasure().getScale();
+      }
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
         byte[] byteData = columnPage.getBytePage();
         if (vectorDataType == DataTypes.SHORT) {
@@ -331,7 +343,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - byteData[i]) * 1000);
+            vector.putLong(i, (max - (long) byteData[i]) * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN) {
           for (int i = 0; i < pageSize; i++) {
@@ -342,6 +354,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
             BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
+            if (decimal.scale() < newScale) {
+              decimal = decimal.setScale(newScale);
+            }
             vector.putDecimal(i, decimal, precision);
           }
         } else {
@@ -365,13 +380,16 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           }
         }  else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - shortData[i]) * 1000);
+            vector.putLong(i, (max - (long) shortData[i]) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
             BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
+            if (decimal.scale() < newScale) {
+              decimal = decimal.setScale(newScale);
+            }
             vector.putDecimal(i, decimal, precision);
           }
         } else {
@@ -395,7 +413,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
         }  else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
             int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, (max - shortInt) * 1000);
+            vector.putLong(i, (max - (long) shortInt) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
@@ -403,6 +421,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           for (int i = 0; i < pageSize; i++) {
             int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
             BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
+            if (decimal.scale() < newScale) {
+              decimal = decimal.setScale(newScale);
+            }
             vector.putDecimal(i, decimal, precision);
           }
         } else {
@@ -423,13 +444,16 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - intData[i]) * 1000);
+            vector.putLong(i, (max - (long) intData[i]) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
             BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+            if (decimal.scale() < newScale) {
+              decimal = decimal.setScale(newScale);
+            }
             vector.putDecimal(i, decimal, precision);
           }
         } else {
@@ -452,6 +476,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
             BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
+            if (decimal.scale() < newScale) {
+              decimal = decimal.setScale(newScale);
+            }
             vector.putDecimal(i, decimal, precision);
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 38bf9b6..21421d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -253,6 +254,8 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true);
       if (vectorDataType == DataTypes.FLOAT) {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
           byte[] byteData = columnPage.getBytePage();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index bdf5373..1813907 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -39,6 +39,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -278,12 +280,19 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       DataType pageDataType = columnPage.getDataType();
       int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
+              true);
       fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }
       }
+      if (vector instanceof ConvertableVector) {
+        ((ConvertableVector) vector).convert();
+      }
+
     }
 
     private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
@@ -304,7 +313,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, byteData[i] * 1000);
+            vector.putLong(i, (long) byteData[i] * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN) {
           vector.putBytes(0, pageSize, byteData, 0);
@@ -330,7 +339,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, shortData[i] * 1000);
+            vector.putLong(i, (long) shortData[i] * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
@@ -356,7 +365,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
             int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, shortInt * 1000);
+            vector.putLong(i, (long) shortInt * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
@@ -378,7 +387,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, intData[i] * 1000);
+            vector.putLong(i, (long) intData[i] * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 4d1e6e7..1d065cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -39,6 +39,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.Encoding;
 
@@ -208,12 +210,18 @@ public class DirectCompressCodec implements ColumnPageCodec {
       DataType pageDataType = columnPage.getDataType();
       int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
+              true);
       fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }
       }
+      if (vector instanceof ConvertableVector) {
+        ((ConvertableVector) vector).convert();
+      }
     }
 
     private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
@@ -234,7 +242,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, byteData[i] * 1000);
+            vector.putLong(i, (long) byteData[i] * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
           vector.putBytes(0, pageSize, byteData, 0);
@@ -260,7 +268,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, shortData[i] * 1000);
+            vector.putLong(i, (long) shortData[i] * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
@@ -286,7 +294,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
             int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, shortInt * 1000);
+            vector.putLong(i, (long) shortInt * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
@@ -308,7 +316,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, intData[i] * 1000);
+            vector.putLong(i, (long) intData[i] * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 89a3168..5231cb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -106,13 +106,18 @@ public final class DecimalConverterFactory {
       // inefficient.
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
+      int newMeasureScale = info.measure.getMeasure().getScale();
       if (valuesToBeConverted instanceof byte[]) {
         byte[] data = (byte[]) valuesToBeConverted;
         for (int i = 0; i < size; i++) {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
           }
         }
       } else if (valuesToBeConverted instanceof short[]) {
@@ -121,7 +126,11 @@ public final class DecimalConverterFactory {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
           }
         }
       } else if (valuesToBeConverted instanceof int[]) {
@@ -130,7 +139,11 @@ public final class DecimalConverterFactory {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
           }
         }
       } else if (valuesToBeConverted instanceof long[]) {
@@ -139,7 +152,11 @@ public final class DecimalConverterFactory {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
           }
         }
       }
@@ -225,6 +242,10 @@ public final class DecimalConverterFactory {
         BitSet nullBitset) {
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
+      int newMeasureScale = info.measure.getMeasure().getScale();
+      if (scale < newMeasureScale) {
+        scale = newMeasureScale;
+      }
       if (valuesToBeConverted instanceof byte[][]) {
         byte[][] data = (byte[][]) valuesToBeConverted;
         for (int i = 0; i < size; i++) {
@@ -232,7 +253,11 @@ public final class DecimalConverterFactory {
             vector.putNull(i);
           } else {
             BigInteger bigInteger = new BigInteger(data[i]);
-            vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision);
+            BigDecimal value = new BigDecimal(bigInteger, scale);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
           }
         }
       }
@@ -263,13 +288,18 @@ public final class DecimalConverterFactory {
         BitSet nullBitset) {
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
+      int newMeasureScale = info.measure.getMeasure().getScale();
       if (valuesToBeConverted instanceof byte[][]) {
         byte[][] data = (byte[][]) valuesToBeConverted;
         for (int i = 0; i < size; i++) {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision);
+            BigDecimal value = DataTypeUtil.byteToBigDecimal(data[i]);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
index 68f8ae6..f102a48 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java
@@ -17,16 +17,7 @@
 package org.apache.carbondata.core.scan.collector;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.scan.collector.impl.AbstractScannedResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RowIdRawBasedResultCollector;
-import org.apache.carbondata.core.scan.collector.impl.RowIdRestructureBasedRawResultCollector;
+import org.apache.carbondata.core.scan.collector.impl.*;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
index a25394f..9d44462 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
@@ -111,7 +111,7 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
   @Override
   public BitSet prunePages(RawBlockletColumnChunks rawBlockletColumnChunks)
       throws FilterUnsupportedException, IOException {
-    return new BitSet();
+    throw new FilterUnsupportedException("Unsupported RestructureEvaluatorImpl on pune pages");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index e4c507d..02c587e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -319,18 +319,20 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     }
   }
 
-  private boolean isScanRequired(DimensionRawColumnChunk rawColumnChunk, int i) {
+  private boolean isScanRequired(DimensionRawColumnChunk rawColumnChunk, int columnIndex) {
     boolean scanRequired;
     DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
     // for no dictionary measure column comparison can be done
     // on the original data as like measure column
-    if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
-        .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+    if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0).getDimension()
+        .hasEncoding(Encoding.DICTIONARY)) {
       scanRequired =
-          isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues, dataType);
+          isScanRequired(rawColumnChunk.getMaxValues()[columnIndex], this.filterRangeValues,
+              dataType);
     } else {
-      scanRequired = isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues,
-        rawColumnChunk.getMinMaxFlagArray()[i]);
+      scanRequired =
+          isScanRequired(rawColumnChunk.getMaxValues()[columnIndex], this.filterRangeValues,
+              rawColumnChunk.getMinMaxFlagArray()[columnIndex]);
     }
     return scanRequired;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index d127728..6a9b3b3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -35,6 +35,7 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
   public DirectDictionaryGenerator directDictionaryGenerator;
   public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
   public GenericQueryType genericQueryType;
+  public int[] invertedIndex;
   public BitSet deletedRows;
   public DecimalConverterFactory.DecimalConverter decimalConverter;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
new file mode 100644
index 0000000..437eee4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+public abstract class AbstractCarbonColumnarVector
+    implements CarbonColumnVector, ConvertableVector {
+
+  @Override
+  public void putShorts(int rowId, int count, short value) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putInts(int rowId, int count, int value) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putLongs(int rowId, int count, long value) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putDoubles(int rowId, int count, double value) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putBytes(int rowId, int count, byte[] value) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putNulls(int rowId, int count) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putNotNull(int rowId) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putNotNull(int rowId, int count) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public boolean isNull(int rowId) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void putObject(int rowId, Object obj) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public Object getData(int rowId) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void reset() {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public DataType getType() {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public DataType getBlockDataType() {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void setBlockDataType(DataType blockDataType) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void setFilteredRowsExist(boolean filteredRowsExist) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void setDictionary(CarbonDictionary dictionary) {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public boolean hasDictionary() {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public CarbonColumnVector getDictionaryVector() {
+    throw new UnsupportedOperationException("Not allowed from here");
+  }
+
+  @Override
+  public void convert() {
+    // Do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
new file mode 100644
index 0000000..4884b4d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+/**
+ * Factory to create ColumnarVectors for inverted index and delete delta queries.
+ */
+public final class ColumnarVectorWrapperDirectFactory {
+
+  /**
+   * Gets carbon vector wrapper to fill the underlying vector based on inverted index and delete
+   * delta.
+   *
+   * @param columnVector     Actual vector to be filled.
+   * @param invertedIndex    Inverted index of column page
+   * @param nullBitset       row locations of nulls in bitset
+   * @param deletedRows      deleted rows locations in bitset.
+   * @param isnullBitsExists whether nullbitset present on this page, usually for dimension columns
+   *                         there is no null bitset.
+   * @return wrapped CarbonColumnVector
+   */
+  public static CarbonColumnVector getDirectVectorWrapperFactory(CarbonColumnVector columnVector,
+      int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists) {
+    if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null || deletedRows
+        .isEmpty())) {
+      return new ColumnarVectorWrapperDirectWithInvertedIndex(columnVector, invertedIndex,
+          isnullBitsExists);
+    } else if ((invertedIndex == null || invertedIndex.length == 0) && (deletedRows != null
+        && !deletedRows.isEmpty())) {
+      return new ColumnarVectorWrapperDirectWithDeleteDelta(columnVector, deletedRows, nullBitset);
+    } else if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows != null
+        && !deletedRows.isEmpty())) {
+      return new ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(columnVector,
+          deletedRows, invertedIndex, nullBitset, isnullBitsExists);
+    } else {
+      return columnVector;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
new file mode 100644
index 0000000..ccde63e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+/**
+ * Column vector for column pages which has delete delta, so it uses delta biset to filter out
+ * data before filling to actual vector.
+ */
+class ColumnarVectorWrapperDirectWithDeleteDelta extends AbstractCarbonColumnarVector {
+
+  private BitSet deletedRows;
+
+  private BitSet nullBits;
+
+  private int counter;
+
+  private CarbonColumnVector columnVector;
+
+  public ColumnarVectorWrapperDirectWithDeleteDelta(CarbonColumnVector vectorWrapper,
+      BitSet deletedRows, BitSet nullBits) {
+    this.deletedRows = deletedRows;
+    this.nullBits = nullBits;
+    this.columnVector = vectorWrapper;
+  }
+
+  @Override
+  public void putBoolean(int rowId, boolean value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putBoolean(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putFloat(int rowId, float value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putFloat(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putShort(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putInt(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putLong(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal value, int precision) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putDecimal(counter++, value, precision);
+      }
+    }
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putDouble(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putByteArray(int rowId, byte[] value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putByteArray(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putByteArray(int rowId, int offset, int length, byte[] value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putByteArray(counter++, offset, length, value);
+      }
+    }
+  }
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    if (!deletedRows.get(rowId)) {
+      if (nullBits.get(rowId)) {
+        columnVector.putNull(counter++);
+      } else {
+        columnVector.putByte(counter++, value);
+      }
+    }
+  }
+
+  @Override
+  public void putNull(int rowId) {
+    if (!deletedRows.get(rowId)) {
+      columnVector.putNull(counter++);
+    }
+  }
+
+  @Override
+  public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!deletedRows.get(rowId++)) {
+        columnVector.putFloat(counter++, src[i]);
+      }
+    }
+  }
+
+  @Override
+  public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!deletedRows.get(rowId++)) {
+        columnVector.putShort(counter++, src[i]);
+      }
+    }
+  }
+
+  @Override
+  public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!deletedRows.get(rowId++)) {
+        columnVector.putInt(counter++, src[i]);
+      }
+    }
+  }
+
+  @Override
+  public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!deletedRows.get(rowId++)) {
+        columnVector.putLong(counter++, src[i]);
+      }
+    }
+  }
+
+  @Override
+  public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!deletedRows.get(rowId++)) {
+        columnVector.putDouble(counter++, src[i]);
+      }
+    }
+  }
+
+  @Override
+  public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!deletedRows.get(rowId++)) {
+        columnVector.putByte(counter++, src[i]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
new file mode 100644
index 0000000..e4507cb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+/**
+ * Column vector for column pages which has delete delta and inverted index, so it uses delta biset
+ * to filter out data and use inverted index before filling to actual vector
+ */
+class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
+    extends ColumnarVectorWrapperDirectWithInvertedIndex {
+
+  private BitSet deletedRows;
+
+  private CarbonColumnVector carbonColumnVector;
+
+  private int precision;
+
+  private BitSet nullBits;
+
+  /**
+   * Constructor
+   * @param vectorWrapper vector to be filled
+   * @param deletedRows deleted rows from delete delta.
+   * @param invertedIndex Inverted index of the column
+   * @param nullBits Null row ordinals in the bitset
+   * @param isnullBitsExists whether to consider inverted index while setting null bitset or not.
+   *                          we are having nullbitset even for dimensions also.
+   *                          But some dimension columns still don't have nullbitset.
+   *                          So if null bitset does not exist then
+   *                          it should not inverted index while setting the null
+   */
+  public ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(
+      CarbonColumnVector vectorWrapper, BitSet deletedRows, int[] invertedIndex, BitSet nullBits,
+      boolean isnullBitsExists) {
+    super(new CarbonColumnVectorImpl(invertedIndex.length, vectorWrapper.getType()), invertedIndex,
+        isnullBitsExists);
+    this.deletedRows = deletedRows;
+    this.carbonColumnVector = vectorWrapper;
+    this.nullBits = nullBits;
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal value, int precision) {
+    this.precision = precision;
+    carbonColumnVector.putDecimal(invertedIndex[rowId], value, precision);
+  }
+
+  @Override
+  public void putNull(int rowId) {
+    if (isnullBitsExists) {
+      nullBits.set(rowId);
+    } else {
+      nullBits.set(invertedIndex[rowId]);
+    }
+  }
+
+  @Override
+  public void convert() {
+    if (columnVector instanceof CarbonColumnVectorImpl) {
+      CarbonColumnVectorImpl localVector = (CarbonColumnVectorImpl) columnVector;
+      DataType dataType = carbonColumnVector.getType();
+      int length = invertedIndex.length;
+      int counter = 0;
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+        byte[] dataArray = (byte[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putByte(counter++, dataArray[i]);
+            }
+          }
+        }
+      } else if (dataType == DataTypes.SHORT) {
+        short[] dataArray = (short[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putShort(counter++, dataArray[i]);
+            }
+          }
+        }
+      } else if (dataType == DataTypes.INT) {
+        int[] dataArray = (int[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putInt(counter++, dataArray[i]);
+            }
+          }
+        }
+      } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
+        long[] dataArray = (long[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putLong(counter++, dataArray[i]);
+            }
+          }
+        }
+      } else if (dataType == DataTypes.FLOAT) {
+        float[] dataArray = (float[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putFloat(counter++, dataArray[i]);
+            }
+          }
+        }
+      } else if (dataType == DataTypes.DOUBLE) {
+        double[] dataArray = (double[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putDouble(counter++, dataArray[i]);
+            }
+          }
+        }
+      } else if (dataType instanceof DecimalType) {
+        BigDecimal[] dataArray = (BigDecimal[]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putDecimal(counter++, dataArray[i], precision);
+            }
+          }
+        }
+      } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+        byte[][] dataArray = (byte[][]) localVector.getDataArray();
+        for (int i = 0; i < length; i++) {
+          if (!deletedRows.get(i)) {
+            if (nullBits.get(i)) {
+              carbonColumnVector.putNull(counter++);
+            } else {
+              carbonColumnVector.putByteArray(counter++, dataArray[i]);
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
new file mode 100644
index 0000000..d95267f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+/**
+ * Column vector for column pages which has inverted index, so it uses inverted index
+ * before filling to actual vector
+ */
+class ColumnarVectorWrapperDirectWithInvertedIndex extends AbstractCarbonColumnarVector {
+
+  protected int[] invertedIndex;
+
+  protected CarbonColumnVector columnVector;
+
+  protected boolean isnullBitsExists;
+
+  public ColumnarVectorWrapperDirectWithInvertedIndex(CarbonColumnVector columnVector,
+      int[] invertedIndex, boolean isnullBitsExists) {
+    this.invertedIndex = invertedIndex;
+    this.columnVector = columnVector;
+    this.isnullBitsExists = isnullBitsExists;
+  }
+
+  @Override
+  public void putBoolean(int rowId, boolean value) {
+    columnVector.putBoolean(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putFloat(int rowId, float value) {
+    columnVector.putFloat(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    columnVector.putShort(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    columnVector.putInt(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    columnVector.putLong(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal value, int precision) {
+    columnVector.putDecimal(invertedIndex[rowId], value, precision);
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    columnVector.putDouble(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putByteArray(int rowId, byte[] value) {
+    columnVector.putByteArray(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putByteArray(int rowId, int offset, int length, byte[] value) {
+    columnVector.putByteArray(invertedIndex[rowId], offset, length, value);
+  }
+
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    columnVector.putByte(invertedIndex[rowId], value);
+  }
+
+  @Override
+  public void putNull(int rowId) {
+    if (isnullBitsExists) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putNull(invertedIndex[rowId]);
+    }
+  }
+
+  @Override
+  public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      columnVector.putFloat(invertedIndex[rowId++], src[i]);
+    }
+  }
+
+  @Override
+  public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      columnVector.putShort(invertedIndex[rowId++], src[i]);
+    }
+  }
+
+  @Override
+  public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      columnVector.putInt(invertedIndex[rowId++], src[i]);
+    }
+  }
+
+  @Override
+  public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      columnVector.putLong(invertedIndex[rowId++], src[i]);
+    }
+  }
+
+  @Override
+  public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      columnVector.putDouble(invertedIndex[rowId++], src[i]);
+    }
+  }
+
+  @Override
+  public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      columnVector.putByte(invertedIndex[rowId++], src[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java
new file mode 100644
index 0000000..7020c66
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+/**
+ * This interface provides method to convert the values by using inverted index and delete delta
+ * and fill to the underlying vector.
+ */
+public interface ConvertableVector {
+
+  /**
+   * Convert the values and fill it to the underlying vector.
+   */
+  void convert();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index fc32862..0434480 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -383,15 +383,13 @@ public class BlockletFilterScanner extends BlockletFullScanner {
         scannedPages.getCount() + pages.cardinality());
     // get the row indexes from bit set for each page
     int[] pageFilteredPages = new int[pages.cardinality()];
+    int[] numberOfRows = new int[pages.cardinality()];
     int index = 0;
     for (int i = pages.nextSetBit(0); i >= 0; i = pages.nextSetBit(i + 1)) {
-      pageFilteredPages[index++] = i;
+      pageFilteredPages[index] = i;
+      numberOfRows[index++] = rawBlockletColumnChunks.getDataBlock().getPageRowCount(i);
     }
     // count(*)  case there would not be any dimensions are measures selected.
-    int[] numberOfRows = new int[pages.cardinality()];
-    for (int i = 0; i < numberOfRows.length; i++) {
-      numberOfRows[i] = rawBlockletColumnChunks.getDataBlock().getPageRowCount(i);
-    }
     long dimensionReadTime = System.currentTimeMillis();
     dimensionReadTime = System.currentTimeMillis() - dimensionReadTime;
     FileReader fileReader = rawBlockletColumnChunks.getFileReader();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
index 24524b8..a989230 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
@@ -224,7 +224,7 @@ class CastColumnTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("Dictionary INT In to implicit Int") {
     checkAnswer(
-      sql("select empno,empname,workgroupcategory from DICTIONARY_CARBON_1 where workgroupcategory in ('1', '2')"),
+      sql("select empno,empname,workgroupcategory from DICTIONARY_CARBON_1 where workgroupcategory in (1, 2)"),
       sql("select empno,empname,workgroupcategory from DICTIONARY_HIVE_1 where workgroupcategory in ('1', '2')")
     )
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 88b7ff9..719fa34 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -408,6 +408,7 @@ class SparkCarbonFileFormat extends FileFormat
           model.setFreeUnsafeMemory(!isAdded)
         }
         val carbonReader = if (readVector) {
+          model.setDirectVectorFill(true);
           val vectorizedReader = new VectorizedCarbonRecordReader(model,
             null,
             supportBatchValue.toString)