You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 16:55:38 UTC

[2/3] carbondata git commit: [CARBONDATA-3012] Added support for full scan queries for vector direct fill.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 39b8282..a760b64 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -124,8 +124,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
-      String compressorName) throws MemoryException {
+  static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
+      byte[] lvEncodedBytes) throws MemoryException {
+    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     DecimalConverterFactory.DecimalConverter decimalConverter =
         DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
             columnSpec.getScale());
@@ -133,10 +134,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     if (size < 0) {
       return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
           DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
-          CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
+          CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
     } else {
       // Here the size is always fixed.
-      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
+      return getDecimalColumnPage(meta, lvEncodedBytes, size);
     }
   }
 
@@ -158,8 +159,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
         lvLength, compressorName);
   }
 
-  private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
+  private static ColumnPage getDecimalColumnPage(ColumnPageEncoderMeta meta,
+      byte[] lvEncodedBytes, int size) throws MemoryException {
+    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+    String compressorName = meta.getCompressorName();
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
     ColumnPage rowOffset = ColumnPage.newPage(
@@ -176,7 +179,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     rowOffset.putInt(counter, offset);
 
     VarLengthColumnPageBase page;
-    if (unsafe) {
+    if (isUnsafeEnabled(meta)) {
       page = new UnsafeDecimalColumnPage(
           new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
           rowId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
index 4e491c5..d82a873 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
@@ -18,9 +18,11 @@
 package org.apache.carbondata.core.datastore.page.encoding;
 
 import java.io.IOException;
+import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 public interface ColumnPageDecoder {
 
@@ -29,6 +31,12 @@ public interface ColumnPageDecoder {
    */
   ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException;
 
+  /**
+   *  Apply decoding algorithm on input byte array and fill the vector here.
+   */
+  void decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
+      BitSet nullBits, boolean isLVEncoded) throws MemoryException, IOException;
+
   ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
       throws MemoryException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index e6aafa0..03a43f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -49,6 +49,9 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   // Make it protected for RLEEncoderMeta
   protected String compressorName;
 
+  // Whether the flow shoild go to fill complete vector while decoding the page.
+  private transient boolean fillCompleteVector;
+
   public ColumnPageEncoderMeta() {
   }
 
@@ -284,4 +287,12 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   public DataType getSchemaDataType() {
     return columnSpec.getSchemaDataType();
   }
+
+  public boolean isFillCompleteVector() {
+    return fillCompleteVector;
+  }
+
+  public void setFillCompleteVector(boolean fillCompleteVector) {
+    this.fillCompleteVector = fillCompleteVector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 920a516..d3070b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -66,6 +66,21 @@ public abstract class EncodingFactory {
    */
   public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
       String compressor) throws IOException {
+    return createDecoder(encodings, encoderMetas, compressor, false);
+  }
+
+  /**
+   * Return new decoder based on encoder metadata read from file
+   * @param encodings encodings used to decode the page
+   * @param encoderMetas metadata of encodings to decode the data
+   * @param compressor Compressor name which will be used to decode data.
+   * @param fullVectorFill whether the flow should go to fill the given vector completely while
+   *                       decoding the data itself.
+   * @return decoder to decode page.
+   * @throws IOException
+   */
+  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+      String compressor, boolean fullVectorFill) throws IOException {
     assert (encodings.size() >= 1);
     assert (encoderMetas.size() == 1);
     Encoding encoding = encodings.get(0);
@@ -74,16 +89,19 @@ public abstract class EncodingFactory {
     DataInputStream in = new DataInputStream(stream);
     if (encoding == DIRECT_COMPRESS || encoding == DIRECT_COMPRESS_VARCHAR) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
           stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
@@ -91,12 +109,14 @@ public abstract class EncodingFactory {
           .createDecoder(metadata);
     } else if (encoding == ADAPTIVE_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
           stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
@@ -108,12 +128,13 @@ public abstract class EncodingFactory {
       return new RLECodec().createDecoder(metadata);
     } else if (encoding == BOOL_BYTE) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
     } else {
       // for backward compatibility
       ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
-      return createDecoderLegacy(metadata, compressor);
+      return createDecoderLegacy(metadata, compressor, fullVectorFill);
     }
   }
 
@@ -121,6 +142,14 @@ public abstract class EncodingFactory {
    * Old way of creating decoder, based on algorithm
    */
   public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
+    return createDecoderLegacy(metadata, compressor, false);
+  }
+
+  /**
+   * Old way of creating decoder, based on algorithm
+   */
+  private ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor,
+      boolean fullVectorFill) {
     if (null == metadata) {
       throw new RuntimeException("internal error");
     }
@@ -139,16 +168,19 @@ public abstract class EncodingFactory {
         AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
         AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof DirectCompressCodec) {
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
                 compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else {
         throw new RuntimeException("internal error");
@@ -161,30 +193,36 @@ public abstract class EncodingFactory {
         AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof DirectCompressCodec) {
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
                 compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof AdaptiveDeltaFloatingCodec) {
         AdaptiveDeltaFloatingCodec adaptiveCodec = (AdaptiveDeltaFloatingCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else {
         throw new RuntimeException("internal error");
       }
     } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.BYTE_ARRAY) {
       // no dictionary dimension
-      return new DirectCompressCodec(stats.getDataType())
-          .createDecoder(new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+      ColumnPageEncoderMeta meta =
+          new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor);
+      meta.setFillCompleteVector(fullVectorFill);
+      return new DirectCompressCodec(stats.getDataType()).createDecoder(meta);
     } else if (dataType == DataTypes.LEGACY_LONG) {
       // In case of older versions like in V1 format it has special datatype to handle
       AdaptiveIntegralCodec adaptiveCodec =
           new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats, false);
       ColumnPageEncoderMeta meta =
           new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+      meta.setFillCompleteVector(fullVectorFill);
       return adaptiveCodec.createDecoder(meta);
     } else {
       throw new RuntimeException("unsupported data type: " + stats.getDataType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 9b0b574..1826798 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -35,6 +37,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -125,6 +130,18 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        page.setNullBits(nullBits);
+        if (page instanceof DecimalColumnPage) {
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        }
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
@@ -226,6 +243,71 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
     }
 
     @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      DataType vectorDataType = vector.getType();
+      if (vectorDataType == DataTypes.FLOAT) {
+        float floatFactor = factor.floatValue();
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (max - byteData[i]) / floatFactor);
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (max - shortData[i]) / floatFactor);
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putFloat(i, (max - shortInt) / floatFactor);
+          }
+        } else {
+          throw new RuntimeException("internal error: " + this.toString());
+        }
+      } else {
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - byteData[i]) / factor);
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - shortData[i]) / factor);
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, (max - shortInt) / factor);
+          }
+        } else if (pageDataType == DataTypes.INT) {
+          int[] intData = columnPage.getIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - intData[i]) / factor);
+          }
+        } else {
+          throw new RuntimeException("Unsupported datatype : " + pageDataType);
+        }
+      }
+
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    @Override
     public double decodeDouble(float value) {
       throw new RuntimeException("internal error: " + debugInfo());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 0e61b33..0d7ad8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -27,6 +28,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -35,6 +37,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -119,9 +125,11 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
     };
   }
 
-  @Override public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+  @Override
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
     return new ColumnPageDecoder() {
-      @Override public ColumnPage decode(byte[] input, int offset, int length)
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
         ColumnPage page = null;
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
@@ -132,7 +140,23 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
-      @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = null;
+        if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        } else {
+          page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        }
+        page.setNullBits(nullBits);
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
       }
@@ -272,5 +296,169 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       // this codec is for integer type only
       throw new RuntimeException("internal error");
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType vectorDataType = vector.getType();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+        byte[] byteData = columnPage.getBytePage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) (max - byteData[i]));
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) (max - byteData[i]));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - byteData[i]));
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - byteData[i]) * 1000);
+          }
+        } else if (vectorDataType == DataTypes.BOOLEAN) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putByte(i, (byte) (max - byteData[i]));
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - byteData[i]));
+          }
+        }
+      } else if (pageDataType == DataTypes.SHORT) {
+        short[] shortData = columnPage.getShortPage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) (max - shortData[i]));
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) (max - shortData[i]));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - shortData[i]));
+          }
+        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - shortData[i]) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - shortData[i]));
+          }
+        }
+
+      } else if (pageDataType == DataTypes.SHORT_INT) {
+        byte[] shortIntPage = columnPage.getShortIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putInt(i, (int) (max - shortInt));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, (max - shortInt));
+          }
+        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, (max - shortInt) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, (max - shortInt));
+          }
+        }
+      } else if (pageDataType == DataTypes.INT) {
+        int[] intData = columnPage.getIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) (max - intData[i]));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - intData[i]));
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - intData[i]) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - intData[i]));
+          }
+        }
+      } else if (pageDataType == DataTypes.LONG) {
+        long[] longData = columnPage.getLongPage();
+        if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - longData[i]));
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - longData[i]) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        }
+      } else {
+        throw new RuntimeException("Unsupported datatype : " + pageDataType);
+      }
+    }
+
   };
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 836af26..38bf9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -113,7 +118,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
-      @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        page.setNullBits(nullBits);
+        if (page instanceof DecimalColumnPage) {
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        }
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
       }
@@ -226,5 +244,69 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
     public double decodeDouble(double value) {
       throw new RuntimeException("internal error: " + debugInfo());
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      DataType vectorDataType = vector.getType();
+      if (vectorDataType == DataTypes.FLOAT) {
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (byteData[i] / floatFactor));
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (shortData[i] / floatFactor));
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putFloat(i, (shortInt / floatFactor));
+          }
+        } else {
+          throw new RuntimeException("internal error: " + this.toString());
+        }
+      } else {
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (byteData[i] / factor));
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (shortData[i] / factor));
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, (shortInt / factor));
+          }
+        } else if (pageDataType == DataTypes.INT) {
+          int[] intData = columnPage.getIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (intData[i] / factor));
+          }
+        } else {
+          throw new RuntimeException("Unsupported datatype : " + pageDataType);
+        }
+      }
+
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
   };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index f1c0ea0..bdf5373 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -111,6 +117,21 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = null;
+        if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        } else {
+          page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        }
+        page.setNullBits(nullBits);
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
@@ -248,6 +269,142 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
     public double decodeDouble(double value) {
       throw new RuntimeException("internal error: " + debugInfo());
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType vectorDataType = vector.getType();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+        byte[] byteData = columnPage.getBytePage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i] * 1000);
+          }
+        } else if (vectorDataType == DataTypes.BOOLEAN) {
+          vector.putBytes(0, pageSize, byteData, 0);
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, byteData[i]);
+          }
+        }
+      } else if (pageDataType == DataTypes.SHORT) {
+        short[] shortData = columnPage.getShortPage();
+        if (vectorDataType == DataTypes.SHORT) {
+          vector.putShorts(0, pageSize, shortData, 0);
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, shortData[i]);
+          }
+        }
+
+      } else if (pageDataType == DataTypes.SHORT_INT) {
+        byte[] shortIntPage = columnPage.getShortIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putInt(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, shortInt);
+          }
+        }
+      } else if (pageDataType == DataTypes.INT) {
+        int[] intData = columnPage.getIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          vector.putInts(0, pageSize, intData, 0);
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, intData[i]);
+          }
+        }
+      } else if (pageDataType == DataTypes.LONG) {
+        long[] longData = columnPage.getLongPage();
+        if (vectorDataType == DataTypes.LONG) {
+          vector.putLongs(0, pageSize, longData, 0);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, longData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+        }
+      } else {
+        double[] doubleData = columnPage.getDoublePage();
+        vector.putDoubles(0, pageSize, doubleData, 0);
+      }
+    }
   };
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index aa03ec1..4d1e6e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.compress;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -95,10 +101,25 @@ public class DirectCompressCodec implements ColumnPageCodec {
         return LazyColumnPage.newPage(decodedPage, converter);
       }
 
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage decodedPage;
+        if (DataTypes.isDecimal(dataType)) {
+          decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+          vectorInfo.decimalConverter = ((DecimalColumnPage) decodedPage).getDecimalConverter();
+        } else {
+          decodedPage = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        }
+        decodedPage.setNullBits(nullBits);
+        converter.decodeAndFillVector(decodedPage, vectorInfo);
+      }
+
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
-        throws MemoryException, IOException {
-        return LazyColumnPage.newPage(
-            ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+          throws MemoryException, IOException {
+        return LazyColumnPage
+            .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
       }
     };
   }
@@ -178,6 +199,149 @@ public class DirectCompressCodec implements ColumnPageCodec {
     public double decodeDouble(double value) {
       return value;
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType vectorDataType = vector.getType();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+        byte[] byteData = columnPage.getBytePage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i] * 1000);
+          }
+        } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
+          vector.putBytes(0, pageSize, byteData, 0);
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, byteData[i]);
+          }
+        }
+      } else if (pageDataType == DataTypes.SHORT) {
+        short[] shortData = columnPage.getShortPage();
+        if (vectorDataType == DataTypes.SHORT) {
+          vector.putShorts(0, pageSize, shortData, 0);
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, shortData[i]);
+          }
+        }
+
+      } else if (pageDataType == DataTypes.SHORT_INT) {
+        byte[] shortIntPage = columnPage.getShortIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putInt(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, shortInt);
+          }
+        }
+      } else if (pageDataType == DataTypes.INT) {
+        int[] intData = columnPage.getIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          vector.putInts(0, pageSize, intData, 0);
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, intData[i]);
+          }
+        }
+      }  else if (pageDataType == DataTypes.LONG) {
+        long[] longData = columnPage.getLongPage();
+        if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, longData[i]);
+          }
+          vector.putLongs(0, pageSize, longData, 0);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, longData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+        }
+      } else if (DataTypes.isDecimal(pageDataType)) {
+        DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+        decimalConverter.fillVector(columnPage.getByteArrayPage(), pageSize, vectorInfo,
+            columnPage.getNullBits());
+      } else {
+        double[] doubleData = columnPage.getDoublePage();
+        vector.putDoubles(0, pageSize, doubleData, 0);
+      }
+    }
   };
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index e7d4118..c9b47db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -314,6 +316,13 @@ public class RLECodec implements ColumnPageCodec {
       return resultPage;
     }
 
+    @Override
+    public void decodeAndFillVector(byte[] input, int offset, int length,
+        ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+        throws MemoryException, IOException {
+      throw new UnsupportedOperationException("Not supposed to be called here");
+    }
+
     @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
         throws MemoryException, IOException {
       return decode(input, offset, length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index a49eced..67d70e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
  */
 public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
 
-  private static final int cutOffDate = Integer.MAX_VALUE >> 1;
+  public static final int cutOffDate = Integer.MAX_VALUE >> 1;
   private static final long SECONDS_PER_DAY = 60 * 60 * 24L;
   public static final long MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index a8da6d4..89a3168 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.core.metadata.datatype;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.BitSet;
 
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -72,6 +75,8 @@ public final class DecimalConverterFactory {
 
     BigDecimal getDecimal(Object valueToBeConverted);
 
+    void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, BitSet nullBitset);
+
     int getSize();
 
     DecimalConverterType getDecimalConverterType();
@@ -80,7 +85,7 @@ public final class DecimalConverterFactory {
 
   public static class DecimalIntConverter implements DecimalConverter {
 
-    private int scale;
+    protected int scale;
 
     DecimalIntConverter(int scale) {
       this.scale = scale;
@@ -95,6 +100,51 @@ public final class DecimalConverterFactory {
       return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
+    @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+        BitSet nullBitset) {
+      // TODO we need to find way to directly set to vector with out conversion. This way is very
+      // inefficient.
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      if (valuesToBeConverted instanceof byte[]) {
+        byte[] data = (byte[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      } else if (valuesToBeConverted instanceof short[]) {
+        short[] data = (short[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      } else if (valuesToBeConverted instanceof int[]) {
+        int[] data = (int[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      } else if (valuesToBeConverted instanceof long[]) {
+        long[] data = (long[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      }
+    }
+
     @Override public int getSize() {
       return 4;
     }
@@ -104,12 +154,10 @@ public final class DecimalConverterFactory {
     }
   }
 
-  public static class DecimalLongConverter implements DecimalConverter {
-
-    private int scale;
+  public static class DecimalLongConverter extends DecimalIntConverter {
 
     DecimalLongConverter(int scale) {
-      this.scale = scale;
+      super(scale);
     }
 
     @Override public Object convert(BigDecimal decimal) {
@@ -173,6 +221,23 @@ public final class DecimalConverterFactory {
       return new BigDecimal(bigInteger, scale);
     }
 
+    @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+        BitSet nullBitset) {
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      if (valuesToBeConverted instanceof byte[][]) {
+        byte[][] data = (byte[][]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            BigInteger bigInteger = new BigInteger(data[i]);
+            vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision);
+          }
+        }
+      }
+    }
+
     @Override public int getSize() {
       return numBytes;
     }
@@ -194,6 +259,22 @@ public final class DecimalConverterFactory {
       return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
     }
 
+    @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+        BitSet nullBitset) {
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      if (valuesToBeConverted instanceof byte[][]) {
+        byte[][] data = (byte[][]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision);
+          }
+        }
+      }
+    }
+
     @Override public int getSize() {
       return -1;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
index d68e4e9..ac50d7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
@@ -57,4 +57,8 @@ public class DeleteDeltaVo {
   public boolean containsRow(int counter) {
     return bitSet.get(counter);
   }
+
+  public BitSet getBitSet() {
+    return bitSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 8695d90..430a555 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -18,10 +18,13 @@ package org.apache.carbondata.core.scan.collector.impl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -30,11 +33,19 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
 
+import org.apache.log4j.Logger;
+
 /**
  * It is not a collector it is just a scanned result holder.
  */
 public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
 
+  /**
+   * logger of result collector factory
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(DictionaryBasedVectorResultCollector.class.getName());
+
   protected ProjectionDimension[] queryDimensions;
 
   protected ProjectionMeasure[] queryMeasures;
@@ -51,8 +62,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
 
   private ColumnVectorInfo[] implictColumnInfo;
 
+  private boolean isDirectVectorFill;
+
   public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
+    this.isDirectVectorFill = blockExecutionInfos.isDirectVectorFill();
+    if (this.isDirectVectorFill) {
+      LOGGER.info("Direct pagewise vector fill collector is used to scan and collect the data");
+    }
     // initialize only if the current block is not a restructured block else the initialization
     // will be taken care by RestructureBasedVectorResultCollector
     if (!blockExecutionInfos.isRestructuredBlock()) {
@@ -141,29 +158,33 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
   @Override
   public void collectResultInColumnarBatch(BlockletScannedResult scannedResult,
       CarbonColumnarBatch columnarBatch) {
-    int numberOfPages = scannedResult.numberOfpages();
-    int filteredRows = 0;
-    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
-      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
-      if (currentPageRowCount == 0) {
-        scannedResult.incrementPageCounter();
-        continue;
-      }
-      int rowCounter = scannedResult.getRowCounter();
-      int availableRows = currentPageRowCount - rowCounter;
-      // getRowCounter holds total number or rows being placed in Vector. Calculate the
-      // Left over space through getRowCounter only.
-      int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
-      requiredRows = Math.min(requiredRows, availableRows);
-      if (requiredRows < 1) {
-        return;
+    if (isDirectVectorFill) {
+      collectResultInColumnarBatchDirect(scannedResult, columnarBatch);
+    } else {
+      int numberOfPages = scannedResult.numberOfpages();
+      int filteredRows = 0;
+      while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+        int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+        if (currentPageRowCount == 0) {
+          scannedResult.incrementPageCounter();
+          continue;
+        }
+        int rowCounter = scannedResult.getRowCounter();
+        int availableRows = currentPageRowCount - rowCounter;
+        // getRowCounter holds total number or rows being placed in Vector. Calculate the
+        // Left over space through getRowCounter only.
+        int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
+        requiredRows = Math.min(requiredRows, availableRows);
+        if (requiredRows < 1) {
+          return;
+        }
+        fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+        filteredRows = scannedResult.markFilteredRows(columnarBatch, rowCounter, requiredRows,
+            columnarBatch.getRowCounter());
+        fillResultToColumnarBatch(scannedResult, columnarBatch, rowCounter, availableRows,
+            requiredRows);
+        columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
       }
-      fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
-      filteredRows = scannedResult.markFilteredRows(
-          columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
-      fillResultToColumnarBatch(
-          scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
-      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
     }
   }
 
@@ -198,4 +219,51 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
     }
   }
 
+  /**
+   * Fill the vector during the page decoding.
+   */
+  private void collectResultInColumnarBatchDirect(BlockletScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch) {
+    int numberOfPages = scannedResult.numberOfpages();
+    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+      if (currentPageRowCount == 0) {
+        scannedResult.incrementPageCounter(null);
+        continue;
+      }
+      DeleteDeltaVo deltaVo = scannedResult.getCurrentDeleteDeltaVo();
+      BitSet bitSet = null;
+      int deletedRows = 0;
+      if (deltaVo != null) {
+        bitSet = deltaVo.getBitSet();
+        deletedRows = bitSet.cardinality();
+      }
+      fillColumnVectorDetails(columnarBatch, bitSet);
+      fillResultToColumnarBatch(scannedResult);
+      columnarBatch.setActualSize(currentPageRowCount - deletedRows);
+      scannedResult.setRowCounter(currentPageRowCount - deletedRows);
+      scannedResult.incrementPageCounter(null);
+      return;
+    }
+  }
+
+  private void fillResultToColumnarBatch(BlockletScannedResult scannedResult) {
+    scannedResult.fillDataChunks(dictionaryInfo, noDictionaryInfo, measureColumnInfo,
+        measureInfo.getMeasureOrdinals());
+
+  }
+
+  private void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch,
+      BitSet deltaBitSet) {
+    for (int i = 0; i < allColumnInfo.length; i++) {
+      allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+      allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+      allColumnInfo[i].deletedRows = deltaBitSet;
+      if (null != allColumnInfo[i].dimension) {
+        allColumnInfo[i].vector.setBlockDataType(dimensionInfo.dataType[i]);
+      }
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6a6a929..fed0faf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -478,6 +478,19 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     } else {
       blockExecutionInfo.setPrefetchBlocklet(queryModel.isPreFetchData());
     }
+    // In case of fg datamap it should not go to direct fill.
+    boolean fgDataMapPathPresent = false;
+    for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
+      fgDataMapPathPresent = blockInfo.getDataMapWriterPath() != null;
+      if (fgDataMapPathPresent) {
+        queryModel.setDirectVectorFill(false);
+        break;
+      }
+    }
+
+    blockExecutionInfo
+        .setDirectVectorFill(queryModel.isDirectVectorFill());
+
     blockExecutionInfo
         .setTotalNumberOfMeasureToRead(segmentProperties.getMeasuresOrdinalToChunkMapping().size());
     blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index e737b0e..f0ef23b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -217,6 +217,11 @@ public class BlockExecutionInfo {
   private QueryStatisticsModel queryStatisticsModel;
 
   /**
+   * It fills the vector directly from decoded column page with out any staging and conversions
+   */
+  private boolean isDirectVectorFill;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -625,4 +630,12 @@ public class BlockExecutionInfo {
   public void setQueryStatisticsModel(QueryStatisticsModel queryStatisticsModel) {
     this.queryStatisticsModel = queryStatisticsModel;
   }
+
+  public boolean isDirectVectorFill() {
+    return isDirectVectorFill && !isRestructuredBlock;
+  }
+
+  public void setDirectVectorFill(boolean directVectorFill) {
+    isDirectVectorFill = directVectorFill;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 22e1e72..49157f9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -760,7 +760,7 @@ public class QueryUtil {
       vector.putNull(vectorRow);
     } else {
       if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, 0, length, value);
+        vector.putByteArray(vectorRow, 0, length, value);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
       } else if (dt == DataTypes.BYTE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 0951da0..d7dcee0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -125,7 +125,11 @@ public class QueryModel {
   private boolean preFetchData = true;
 
   /**
-   * It fills the vector directly from decoded column page with out any staging and conversions
+   * It fills the vector directly from decoded column page with out any staging and conversions.
+   * Execution engine can set this filed to true in case of vector flow. Note that execution engine
+   * should make sure that batch size vector should be greater than or equal to column page size.
+   * In this flow only pages will be pruned and decode the page and fill the complete page data to
+   * vector, so it is execution engine responsibility to filter the rows at row level.
    */
   private boolean isDirectVectorFill;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 9191d08..4963441 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -73,6 +73,11 @@ public abstract class BlockletScannedResult {
   private int[] pageFilteredRowCount;
 
   /**
+   * Filtered pages to be decoded and loaded to vector.
+   */
+  private int[] pageIdFiltered;
+
+  /**
    * to keep track of number of rows process
    */
   protected int rowCounter;
@@ -304,7 +309,7 @@ public abstract class BlockletScannedResult {
               j :
               pageFilteredRowId[pageCounter][j]);
         }
-        vector.putBytes(vectorOffset++,
+        vector.putByteArray(vectorOffset++,
             data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
       }
     }
@@ -342,6 +347,19 @@ public abstract class BlockletScannedResult {
   }
 
   /**
+   * Just increment the page counter and reset the remaining counters.
+   */
+  public void incrementPageCounter(ColumnVectorInfo[] vectorInfos) {
+    rowCounter = 0;
+    currentRow = -1;
+    pageCounter++;
+    if (null != deletedRecordMap && pageCounter < pageIdFiltered.length) {
+      currentDeleteDeltaVo =
+          deletedRecordMap.get(blockletNumber + "_" + pageIdFiltered[pageCounter]);
+    }
+  }
+
+  /**
    * This case is used only in case of compaction, since it does not use filter flow.
    */
   public void fillDataChunks() {
@@ -369,6 +387,36 @@ public abstract class BlockletScannedResult {
         pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
   }
 
+  /**
+   * Fill all the vectors with data by decompressing/decoding the column page
+   */
+  public void fillDataChunks(ColumnVectorInfo[] dictionaryInfo, ColumnVectorInfo[] noDictionaryInfo,
+      ColumnVectorInfo[] msrVectorInfo, int[] measuresOrdinal) {
+    freeDataChunkMemory();
+    if (pageCounter >= pageFilteredRowCount.length) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      dimRawColumnChunks[dictionaryColumnChunkIndexes[i]]
+          .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], dictionaryInfo[i]);
+    }
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      dimRawColumnChunks[noDictionaryColumnChunkIndexes[i]]
+          .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], noDictionaryInfo[i]);
+    }
+
+    for (int i = 0; i < measuresOrdinal.length; i++) {
+      msrRawColumnChunks[measuresOrdinal[i]]
+          .convertToColumnPageAndFillVector(pageIdFiltered[pageCounter], msrVectorInfo[i]);
+    }
+    QueryStatistic pageUncompressTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
+    pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
+        pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
+  }
+
   // free the memory for the last page chunk
   private void freeDataChunkMemory() {
     for (int i = 0; i < dimensionColumnPages.length; i++) {
@@ -390,6 +438,14 @@ public abstract class BlockletScannedResult {
     return pageFilteredRowCount.length;
   }
 
+  public int[] getPageIdFiltered() {
+    return pageIdFiltered;
+  }
+
+  public void setPageIdFiltered(int[] pageIdFiltered) {
+    this.pageIdFiltered = pageIdFiltered;
+  }
+
   /**
    * Get total rows in the current page
    *
@@ -513,7 +569,13 @@ public abstract class BlockletScannedResult {
     // if deleted recors map is present for this block
     // then get the first page deleted vo
     if (null != deletedRecordMap) {
-      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+      String key;
+      if (pageIdFiltered != null) {
+        key = blockletNumber + '_' + pageIdFiltered[pageCounter];
+      } else {
+        key = blockletNumber + '_' + pageCounter;
+      }
+      currentDeleteDeltaVo = deletedRecordMap.get(key);
     }
   }
 
@@ -616,6 +678,12 @@ public abstract class BlockletScannedResult {
    */
   public void setPageFilteredRowCount(int[] pageFilteredRowCount) {
     this.pageFilteredRowCount = pageFilteredRowCount;
+    if (pageIdFiltered == null) {
+      pageIdFiltered = new int[pageFilteredRowCount.length];
+      for (int i = 0; i < pageIdFiltered.length; i++) {
+        pageIdFiltered[i] = i;
+      }
+    }
   }
 
   /**
@@ -714,6 +782,10 @@ public abstract class BlockletScannedResult {
     return rowsFiltered;
   }
 
+  public DeleteDeltaVo getCurrentDeleteDeltaVo() {
+    return currentDeleteDeltaVo;
+  }
+
   /**
    * Below method will be used to check row got deleted
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index dd0e8b9..f670884 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -27,18 +27,26 @@ public interface CarbonColumnVector {
 
   void putFloat(int rowId, float value);
 
+  void putFloats(int rowId, int count, float[] src, int srcIndex);
+
   void putShort(int rowId, short value);
 
   void putShorts(int rowId, int count, short value);
 
+  void putShorts(int rowId, int count, short[] src, int srcIndex);
+
   void putInt(int rowId, int value);
 
   void putInts(int rowId, int count, int value);
 
+  void putInts(int rowId, int count, int[] src, int srcIndex);
+
   void putLong(int rowId, long value);
 
   void putLongs(int rowId, int count, long value);
 
+  void putLongs(int rowId, int count, long[] src, int srcIndex);
+
   void putDecimal(int rowId, BigDecimal value, int precision);
 
   void putDecimals(int rowId, int count, BigDecimal value, int precision);
@@ -47,14 +55,18 @@ public interface CarbonColumnVector {
 
   void putDoubles(int rowId, int count, double value);
 
-  void putBytes(int rowId, byte[] value);
+  void putDoubles(int rowId, int count, double[] src, int srcIndex);
 
-  void putBytes(int rowId, int count, byte[] value);
+  void putByteArray(int rowId, byte[] value);
 
-  void putBytes(int rowId, int offset, int length, byte[] value);
+  void putByteArray(int rowId, int offset, int length, byte[] value);
 
   void putByte(int rowId, byte value);
 
+  void putBytes(int rowId, int count, byte[] value);
+
+  void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
   void putNull(int rowId);
 
   void putNulls(int rowId, int count);