You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/12/12 04:06:40 UTC

carbondata git commit: [CARBONDATA-3157] Added lazy load and direct vector fill support to Presto

Repository: carbondata
Updated Branches:
  refs/heads/master 1f4614a2c -> 6026cb57c


[CARBONDATA-3157] Added lazy load and direct vector fill support to Presto

To improve the scan performance, integrate lazy loading and direct fil vector features to Carbon Presto Integration.

This PR also fixes the query fail in case of multiple table join and filters

This closes #2978


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6026cb57
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6026cb57
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6026cb57

Branch: refs/heads/master
Commit: 6026cb57ce6cca7a2194e544f11d91a3b545fe12
Parents: 1f4614a
Author: ravipesala <ra...@gmail.com>
Authored: Wed Dec 5 18:34:13 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 12 12:06:23 2018 +0800

----------------------------------------------------------------------
 .../safe/AbstractNonDictionaryVectorFiller.java |  43 ++-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |   4 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |   4 +-
 .../adaptive/AdaptiveFloatingCodec.java         |   4 +-
 .../adaptive/AdaptiveIntegralCodec.java         |   4 +-
 .../encoding/compress/DirectCompressCodec.java  |   4 +-
 .../RestructureBasedVectorResultCollector.java  |   3 +-
 .../scan/result/vector/CarbonColumnVector.java  |   2 +-
 .../vector/impl/CarbonColumnVectorImpl.java     |  24 +-
 .../AbstractCarbonColumnarVector.java           |   2 +-
 .../ColumnarVectorWrapperDirectFactory.java     |  22 +-
 .../vector/impl/directread/SequentialFill.java  |  38 +++
 .../presto/CarbonColumnVectorWrapper.java       |   2 +-
 .../carbondata/presto/CarbonVectorBatch.java    |  10 +-
 .../carbondata/presto/CarbondataPageSource.java |   1 +
 .../presto/CarbondataPageSourceProvider.java    |   5 +
 .../presto/ColumnarVectorWrapperDirect.java     | 310 +++++++++++++++++++
 .../PrestoCarbonVectorizedRecordReader.java     |   9 +-
 .../presto/impl/CarbonTableConfig.java          |  10 +
 .../presto/impl/CarbonTableReader.java          |   2 +-
 .../presto/readers/BooleanStreamReader.java     |  10 +
 .../presto/readers/SliceStreamReader.java       |   2 +-
 .../vectorreader/ColumnarVectorWrapper.java     |   2 +-
 .../ColumnarVectorWrapperDirect.java            |   2 +-
 24 files changed, 482 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
index 23ac4a9..38e28ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectWithInvertedIndex;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -83,20 +84,39 @@ class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
   @Override
   public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
+    boolean addSequential = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex
+        || vector instanceof SequentialFill;
+
     int localOffset = 0;
     ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
-    for (int i = 0; i < numberOfRows; i++) {
-      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
-      localOffset += 2;
-      if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
-        vector.putNull(i);
-      } else {
-        vector.putArray(i, localOffset, length);
+    // In case of inverted index and sequential fill, add data to vector sequentially instead of
+    // adding offsets and data separately.
+    if (addSequential) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+        localOffset += 2;
+        if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
+          vector.putNull(i);
+        } else {
+          vector.putByteArray(i, localOffset, length, data);
+        }
+        localOffset += length;
       }
-      localOffset += length;
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+        localOffset += 2;
+        if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
+          vector.putNull(i);
+        } else {
+          vector.putArray(i, localOffset, length);
+        }
+        localOffset += length;
+      }
+      vector.putAllByteArray(data, 0, actualDataLength);
     }
-    vector.putAllByteArray(data, 0, actualDataLength);
   }
 }
 
@@ -111,7 +131,8 @@ class LongStringVectorFiller extends AbstractNonDictionaryVectorFiller {
 
   @Override public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    boolean invertedIndex = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex;
+    boolean invertedIndex = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex
+        || vector instanceof SequentialFill;
     int localOffset = 0;
     ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
     if (invertedIndex) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index fb53dba..97c3bed 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -316,7 +317,8 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
         }
       }
 
-      if (deletedRows == null || deletedRows.isEmpty()) {
+      if ((deletedRows == null || deletedRows.isEmpty())
+          && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 9ed21f4..888ce87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -318,7 +319,8 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
               true, false);
       fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
-      if (deletedRows == null || deletedRows.isEmpty()) {
+      if ((deletedRows == null || deletedRows.isEmpty())
+          && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index b16e57d..eb266d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -302,7 +303,8 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
         }
       }
 
-      if (deletedRows == null || deletedRows.isEmpty()) {
+      if ((deletedRows == null || deletedRows.isEmpty())
+          && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 66639fe..37ff7c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -291,7 +292,8 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
               true, false);
       fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
-      if (deletedRows == null || deletedRows.isEmpty()) {
+      if ((deletedRows == null || deletedRows.isEmpty())
+          && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 60344d6..a33899f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.Encoding;
 
@@ -239,7 +240,8 @@ public class DirectCompressCodec implements ColumnPageCodec {
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
               true, false);
       fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
-      if (deletedRows == null || deletedRows.isEmpty()) {
+      if ((deletedRows == null || deletedRows.isEmpty())
+          && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index b95bffe..a38b9e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -204,7 +204,8 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
       } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
         vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size, (long) defaultValue);
       } else {
-        vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, (byte[])defaultValue);
+        vector.putByteArray(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+            (byte[]) defaultValue);
       }
     } else {
       vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 25a2eb1..70a2105 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -68,7 +68,7 @@ public interface CarbonColumnVector {
 
   void putByte(int rowId, byte value);
 
-  void putBytes(int rowId, int count, byte[] value);
+  void putByteArray(int rowId, int count, byte[] value);
 
   void putBytes(int rowId, int count, byte[] src, int srcIndex);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index 98536f6..30d2317 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -47,7 +47,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
 
   private short[] shorts;
 
-  private BitSet nullBytes;
+  protected BitSet nullBytes;
 
   private DataType dataType;
 
@@ -69,6 +69,10 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
 
   private CarbonColumnVector dictionaryVector;
 
+  private LazyPageLoader lazyPage;
+
+  private boolean loaded;
+
   public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
     this.batchSize = batchSize;
     nullBytes = new BitSet(batchSize);
@@ -163,7 +167,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     byteArr[rowId] = value;
   }
 
-  @Override public void putBytes(int rowId, int count, byte[] value) {
+  @Override public void putByteArray(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; ++i) {
       bytes[i + rowId] = value;
     }
@@ -208,6 +212,9 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   }
 
   @Override public Object getData(int rowId) {
+    if (!loaded) {
+      loadPage();
+    }
     if (nullBytes.get(rowId)) {
       return null;
     }
@@ -243,6 +250,9 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   }
 
   public Object getDataArray() {
+    if (!loaded) {
+      loadPage();
+    }
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       return  byteArr;
     } else if (dataType == DataTypes.SHORT) {
@@ -291,6 +301,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     } else {
       Arrays.fill(data, null);
     }
+    loaded = false;
 
   }
 
@@ -367,7 +378,14 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   }
 
   @Override public void setLazyPage(LazyPageLoader lazyPage) {
-    lazyPage.loadPage();
+    this.lazyPage = lazyPage;
+  }
+
+  public void loadPage() {
+    if (lazyPage != null) {
+      lazyPage.loadPage();
+    }
+    loaded = true;
   }
 
   @Override public void putArray(int rowId, int offset, int length) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
index 4c783b4..9083a20 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
@@ -59,7 +59,7 @@ public abstract class AbstractCarbonColumnarVector
   }
 
   @Override
-  public void putBytes(int rowId, int count, byte[] value) {
+  public void putByteArray(int rowId, int count, byte[] value) {
     throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
index 84e42f5..f6d2941 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
@@ -41,15 +41,25 @@ public final class ColumnarVectorWrapperDirectFactory {
   public static CarbonColumnVector getDirectVectorWrapperFactory(CarbonColumnVector columnVector,
       int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists,
       boolean isDictVector) {
-    if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null || deletedRows
-        .isEmpty())) {
+    // If it is sequential data filler then add the null bitset.
+    if (columnVector instanceof SequentialFill) {
+      // If it has inverted index then create a dummy delete rows bitset so that it goes to
+      // ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex, here it does the sequential
+      // filling using another vector.
+      if ((invertedIndex != null && invertedIndex.length > 0)) {
+        if (deletedRows == null) {
+          deletedRows = new BitSet();
+        }
+      } else if (deletedRows == null) {
+        ((SequentialFill) columnVector).setNullBits(nullBitset);
+      }
+    }
+    if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null)) {
       return new ColumnarVectorWrapperDirectWithInvertedIndex(columnVector, invertedIndex,
           isnullBitsExists);
-    } else if ((invertedIndex == null || invertedIndex.length == 0) && (deletedRows != null
-        && !deletedRows.isEmpty())) {
+    } else if ((invertedIndex == null || invertedIndex.length == 0) && deletedRows != null) {
       return new ColumnarVectorWrapperDirectWithDeleteDelta(columnVector, deletedRows, nullBitset);
-    } else if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows != null
-        && !deletedRows.isEmpty())) {
+    } else if ((invertedIndex != null && invertedIndex.length > 0) && deletedRows != null) {
       return new ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(columnVector,
           deletedRows, invertedIndex, nullBitset, isnullBitsExists, isDictVector);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/SequentialFill.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/SequentialFill.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/SequentialFill.java
new file mode 100644
index 0000000..a0df68c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/SequentialFill.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector.impl.directread;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * It is sort of a marker interface to let execution engine know that it is appendable/sequential
+ * data adding vector. It means we cannot add random rowids to it.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.Internal
+public interface SequentialFill {
+
+  /**
+   * Set the null bitset
+   * @param nullBits
+   */
+  void setNullBits(BitSet nullBits);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index f001488..b43676c 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -157,7 +157,7 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, int count, byte[] value) {
+  @Override public void putByteArray(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
         columnVector.putByteArray(counter++, value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index 2a0ab75..fb8300a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalType;
@@ -68,8 +69,13 @@ public class CarbonVectorBatch {
   }
 
   public static CarbonVectorBatch allocate(StructField[] schema,
-      CarbonDictionaryDecodeReadSupport readSupport) {
-    return new CarbonVectorBatch(schema, readSupport, DEFAULT_BATCH_SIZE);
+      CarbonDictionaryDecodeReadSupport readSupport, boolean isDirectFill) {
+    if (isDirectFill) {
+      return new CarbonVectorBatch(schema, readSupport,
+          CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+    } else {
+      return new CarbonVectorBatch(schema, readSupport,DEFAULT_BATCH_SIZE);
+    }
   }
 
   private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 51677de..93de394 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -161,6 +161,7 @@ class CarbondataPageSource implements ConnectorPageSource {
       }
       checkState(batchId == expectedBatchId);
       try {
+        vectorReader.getColumnarBatch().column(columnIndex).loadPage();
         PrestoVectorBlockBuilder blockBuilder =
             (PrestoVectorBlockBuilder) vectorReader.getColumnarBatch().column(columnIndex);
         blockBuilder.setBatchSize(lazyBlock.getPositionCount());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 3ec815d..bef246e 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -100,6 +100,11 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
     checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
         "split is not for this connector");
     QueryModel queryModel = createQueryModel(carbondataSplit, columns);
+    if (carbonTableReader.config.getPushRowFilter() == null ||
+        carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) {
+      queryModel.setDirectVectorFill(true);
+      queryModel.setPreFetchData(false);
+    }
     QueryExecutor queryExecutor =
         QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
new file mode 100644
index 0000000..9a381cd
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
+import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
+
+/**
+ * Fills the vector directly with out considering any deleted rows.
+ */
+class ColumnarVectorWrapperDirect implements CarbonColumnVector, SequentialFill {
+  /**
+   * It is adapter class of complete ColumnarBatch.
+   */
+  protected CarbonColumnVectorImpl columnVector;
+
+  /**
+   * It is current block file datatype used for alter table scenarios when datatype changes.
+   */
+  private DataType blockDataType;
+
+  private CarbonColumnVector dictionaryVector;
+
+  private BitSet nullBitset;
+
+  ColumnarVectorWrapperDirect(CarbonColumnVectorImpl columnVector) {
+    this.columnVector = columnVector;
+    this.dictionaryVector = columnVector.getDictionaryVector();
+    this.nullBitset = new BitSet();
+  }
+
+  @Override public void setNullBits(BitSet nullBits) {
+    this.nullBitset = nullBits;
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putBoolean(rowId, value);
+    }
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putFloat(rowId, value);
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putShort(rowId, value);
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short value) {
+    columnVector.putShorts(rowId, count, value);
+
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putInt(rowId, value);
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int value) {
+    columnVector.putInts(rowId, count, value);
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putLong(rowId, value);
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long value) {
+    columnVector.putLongs(rowId, count, value);
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putDecimal(rowId, value, precision);
+    }
+  }
+
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putDecimal(rowId, value, precision);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putDouble(rowId, value);
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double value) {
+    columnVector.putDoubles(rowId, count, value);
+  }
+
+  @Override public void putByteArray(int rowId, byte[] value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putByteArray(rowId, value);
+    }
+  }
+
+  @Override
+  public void putByteArray(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; i++) {
+      columnVector.putByteArray(rowId++, value);
+    }
+  }
+
+  @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
+    if (nullBitset.get(rowId)) {
+      columnVector.putNull(rowId);
+    } else {
+      columnVector.putByteArray(rowId, offset, length, value);
+    }
+  }
+
+  @Override public void putNull(int rowId) {
+    columnVector.putNull(rowId);
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    columnVector.putNulls(rowId, count);
+  }
+
+  @Override public void putNotNull(int rowId) {
+    columnVector.putNotNull(rowId);
+  }
+
+  @Override public void putNotNull(int rowId, int count) {
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return columnVector.isNullAt(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    throw new UnsupportedOperationException(
+        "Not supported this opeartion from " + this.getClass().getName());
+  }
+
+  @Override public Object getData(int rowId) {
+    throw new UnsupportedOperationException(
+        "Not supported this opeartion from " + this.getClass().getName());
+  }
+
+  @Override public void reset() {
+    if (null != dictionaryVector) {
+      dictionaryVector.reset();
+    }
+  }
+
+  @Override public DataType getType() {
+    return columnVector.getType();
+  }
+
+  @Override public DataType getBlockDataType() {
+    return blockDataType;
+  }
+
+  @Override public void setBlockDataType(DataType blockDataType) {
+    this.blockDataType = blockDataType;
+  }
+
+  @Override public void setDictionary(CarbonDictionary dictionary) {
+    columnVector.setDictionary(dictionary);
+  }
+
+  @Override public boolean hasDictionary() {
+    return columnVector.hasDictionary();
+  }
+
+
+  @Override public CarbonColumnVector getDictionaryVector() {
+    return dictionaryVector;
+  }
+
+  @Override public void putByte(int rowId, byte value) {
+    columnVector.putByte(rowId, value);
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+    // Leave it, as it does not need to do anything here.
+  }
+
+  @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putFloat(rowId, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putShort(rowId, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putInt(rowId, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putLong(rowId, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putDouble(rowId, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      if (nullBitset.get(rowId)) {
+        columnVector.putNull(rowId);
+      } else {
+        columnVector.putByte(rowId, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void setLazyPage(LazyPageLoader lazyPage) {
+    columnVector.setLazyPage(lazyPage);
+  }
+
+  @Override public void putArray(int rowId, int offset, int length) {
+    columnVector.putArray(rowId, offset, length);
+  }
+
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    columnVector.putAllByteArray(data, offset, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 4e2d36c..08401df 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -214,11 +214,16 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
       }
     }
 
-    columnarBatch = CarbonVectorBatch.allocate(fields, readSupport);
+    columnarBatch =
+        CarbonVectorBatch.allocate(fields, readSupport, queryModel.isDirectVectorFill());
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
     boolean[] filteredRows = new boolean[columnarBatch.capacity()];
     for (int i = 0; i < fields.length; i++) {
-      vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows);
+      if (queryModel.isDirectVectorFill()) {
+        vectors[i] = new ColumnarVectorWrapperDirect(columnarBatch.column(i));
+      } else {
+        vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows);
+      }
     }
     carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index 4d18184..ab1c871 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -43,6 +43,7 @@ public class CarbonTableConfig {
   private String s3N_acesssKey;
   private String s3N_secretKey;
   private String endPoint;
+  private String pushRowFilter;
 
 
   @NotNull public String getDbPath() {
@@ -195,4 +196,13 @@ public class CarbonTableConfig {
     this.endPoint = endPoint;
     return this;
   }
+
+  public String getPushRowFilter() {
+    return pushRowFilter;
+  }
+
+  @Config("carbon.push.rowfilters.for.vector")
+  public void setPushRowFilter(String pushRowFilter) {
+    this.pushRowFilter = pushRowFilter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7baf7ea..546bf9b 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -412,7 +412,7 @@ public class CarbonTableReader {
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.carbonTable;
     TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
-    Configuration config = FileFactory.getConfiguration();
+    Configuration config = new Configuration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index b523064..481ab27 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -64,6 +64,16 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl
     }
   }
 
+  @Override public void putByte(int rowId, byte value) {
+    type.writeBoolean(builder, value == 1);
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = 0; i < count; i++) {
+      type.writeBoolean(builder, src[srcIndex++] == 1);
+    }
+  }
+
   @Override public void putBoolean(int rowId, boolean value) {
     type.writeBoolean(builder, value);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 04e5bb3..d9c7ad3 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -80,8 +80,8 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
     int[] dictOffsets = new int[dictionary.getDictionarySize() + 1];
     int size = 0;
     for (int i = 0; i < dictionary.getDictionarySize(); i++) {
+      dictOffsets[i] = size;
       if (dictionary.getDictionaryValue(i) != null) {
-        dictOffsets[i] = size;
         size += dictionary.getDictionaryValue(i).length;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 7520ef6..dbad225 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -177,7 +177,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, int count, byte[] value) {
+  @Override public void putByteArray(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
         sparkColumnVectorProxy.putByteArray(counter++, value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026cb57/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
index c50d060..413c5f0 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
@@ -119,7 +119,7 @@ class ColumnarVectorWrapperDirect implements CarbonColumnVector {
   }
 
   @Override
-  public void putBytes(int rowId, int count, byte[] value) {
+  public void putByteArray(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       sparkColumnVectorProxy.putByteArray(rowId, value);
       rowId++;