You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/27 01:50:19 UTC

carbondata git commit: [CARBONDATA-3048] Added Lazy Loading For 2.2/2.1

Repository: carbondata
Updated Branches:
  refs/heads/master 170c2f56d -> 58ba45ef8


[CARBONDATA-3048] Added Lazy Loading For 2.2/2.1

Problem:
Currently in 2.2/2.1 For Direct fill Lazy loading is not added because of this when data is huge and number of columns are high query is taking more time Lazy to execute.

Solution
Add Lazy loading for 2.2 and 2.1

Fixed Local Dictionary test case failure when it is enabled

This closes #2846


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/58ba45ef
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/58ba45ef
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/58ba45ef

Branch: refs/heads/master
Commit: 58ba45ef89c4794a3214ad52f4a3b8aa89e175a3
Parents: 170c2f5
Author: kumarvishal09 <ku...@gmail.com>
Authored: Tue Oct 23 22:15:14 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Oct 27 07:10:00 2018 +0530

----------------------------------------------------------------------
 .../impl/LocalDictDimensionDataChunkStore.java  |  20 +-
 .../safe/AbstractNonDictionaryVectorFiller.java | 221 +++----
 .../SafeFixedLengthDimensionDataChunkStore.java |   2 +-
 ...feVariableLengthDimensionDataChunkStore.java |  13 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |   2 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |   2 +-
 .../adaptive/AdaptiveFloatingCodec.java         |   2 +-
 .../adaptive/AdaptiveIntegralCodec.java         |   4 +-
 .../encoding/compress/DirectCompressCodec.java  |   4 +-
 .../ColumnarVectorWrapperDirectFactory.java     |   5 +-
 ...erDirectWithDeleteDeltaAndInvertedIndex.java |   8 +-
 .../apache/carbondata/core/util/ByteUtil.java   |   2 +-
 .../detailquery/CastColumnTestCase.scala        |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   8 +-
 .../vectorreader/ColumnarVectorWrapper.java     |   2 +-
 .../ColumnarVectorWrapperDirect.java            |   2 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 586 ++++++++++++-------
 17 files changed, 499 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index e70424f..a384743 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -17,11 +17,15 @@
 
 package org.apache.carbondata.core.datastore.chunk.store.impl;
 
+import java.util.BitSet;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
@@ -61,16 +65,24 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
       vector.setDictionary(dictionary);
       dictionary.setDictionaryUsed();
     }
+    BitSet nullBitset = new BitSet();
+    CarbonColumnVector dictionaryVector = ColumnarVectorWrapperDirectFactory
+        .getDirectVectorWrapperFactory(vector.getDictionaryVector(), invertedIndex, nullBitset,
+            vectorInfo.deletedRows, false, true);
+    vector = ColumnarVectorWrapperDirectFactory
+        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBitset, vectorInfo.deletedRows,
+            false, false);
     for (int i = 0; i < rowsNum; i++) {
       int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
       if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
         vector.putNull(i);
-        vector.getDictionaryVector().putNull(i);
+        dictionaryVector.putNull(i);
       } else {
-        vector.putNotNull(i);
-        vector.getDictionaryVector().putInt(i, surrogate);
+        dictionaryVector.putInt(i, surrogate);
       }
-
+    }
+    if (dictionaryVector instanceof ConvertableVector) {
+      ((ConvertableVector) dictionaryVector).convert();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
index 9626da7..f2e91be 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
-import java.nio.ByteBuffer;
-
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -32,43 +30,38 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 @InterfaceStability.Stable
 public abstract class AbstractNonDictionaryVectorFiller {
 
-  protected int lengthSize;
   protected int numberOfRows;
 
-  public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) {
-    this.lengthSize = lengthSize;
+  public AbstractNonDictionaryVectorFiller(int numberOfRows) {
     this.numberOfRows = numberOfRows;
   }
 
-  public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer);
+  public abstract void fillVector(byte[] data, CarbonColumnVector vector);
 
-  public int getLengthFromBuffer(ByteBuffer buffer) {
-    return buffer.getShort();
-  }
 }
 
 class NonDictionaryVectorFillerFactory {
 
-  public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
+  public static AbstractNonDictionaryVectorFiller getVectorFiller(int length, DataType type,
       int numberOfRows) {
     if (type == DataTypes.STRING) {
-      if (lengthSize > DataTypes.SHORT.getSizeInBytes()) {
-        return new LongStringVectorFiller(lengthSize, numberOfRows);
+      if (length > DataTypes.SHORT.getSizeInBytes()) {
+        return new LongStringVectorFiller(numberOfRows);
       } else {
-        return new StringVectorFiller(lengthSize, numberOfRows);
+        return new StringVectorFiller(numberOfRows);
       }
     } else if (type == DataTypes.VARCHAR) {
-      return new LongStringVectorFiller(lengthSize, numberOfRows);
+      return new LongStringVectorFiller(numberOfRows);
     } else if (type == DataTypes.TIMESTAMP) {
-      return new TimeStampVectorFiller(lengthSize, numberOfRows);
+      return new TimeStampVectorFiller(numberOfRows);
     } else if (type == DataTypes.BOOLEAN) {
-      return new BooleanVectorFiller(lengthSize, numberOfRows);
+      return new BooleanVectorFiller(numberOfRows);
     } else if (type == DataTypes.SHORT) {
-      return new ShortVectorFiller(lengthSize, numberOfRows);
+      return new ShortVectorFiller(numberOfRows);
     } else if (type == DataTypes.INT) {
-      return new IntVectorFiller(lengthSize, numberOfRows);
+      return new IntVectorFiller(numberOfRows);
     } else if (type == DataTypes.LONG) {
-      return new LongVectorFiller(lengthSize, numberOfRows);
+      return new LongVectorFiller(numberOfRows);
     } else {
       throw new UnsupportedOperationException("Not supported datatype : " + type);
     }
@@ -79,208 +72,168 @@ class NonDictionaryVectorFillerFactory {
 
 class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
 
-  public StringVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+  public StringVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    int startOffset = 0;
-    // as first position will be start from length of bytes as data is stored first in the memory
-    // block we need to skip first two bytes this is because first two bytes will be length of the
-    // data which we have to skip
-    int currentOffset = lengthSize;
+    int localOffset = 0;
     ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
-    for (int i = 0; i < numberOfRows - 1; i++) {
-      buffer.position(startOffset);
-      startOffset += getLengthFromBuffer(buffer) + lengthSize;
-      int length = startOffset - (currentOffset);
+    for (int i = 0; i < numberOfRows; i++) {
+      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+      localOffset += 2;
       if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
         vector.putNull(i);
       } else {
-        vector.putByteArray(i, currentOffset, length, data);
+        vector.putByteArray(i, localOffset, length, data);
       }
-      currentOffset = startOffset + lengthSize;
-    }
-    // Handle last row
-    int length = (data.length - currentOffset);
-    if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-        CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
-      vector.putNull(numberOfRows - 1);
-    } else {
-      vector.putByteArray(numberOfRows - 1, currentOffset, length, data);
+      localOffset += length;
     }
   }
 }
 
-class LongStringVectorFiller extends StringVectorFiller {
-  public LongStringVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+class LongStringVectorFiller extends AbstractNonDictionaryVectorFiller {
+  public LongStringVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public int getLengthFromBuffer(ByteBuffer buffer) {
-    return buffer.getInt();
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
+    // start position will be used to store the current data position
+    int localOffset = 0;
+    ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
+    for (int i = 0; i < numberOfRows; i++) {
+      int length =
+          (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 0xFF) << 16) | (
+              (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] & 0xFF));
+      localOffset += 4;
+      if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
+        vector.putNull(i);
+      } else {
+        vector.putByteArray(i, localOffset, length, data);
+      }
+      localOffset += length;
+    }
   }
 }
 
 class BooleanVectorFiller extends AbstractNonDictionaryVectorFiller {
 
-  public BooleanVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+  public BooleanVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    int startOffset = 0;
-    int currentOffset = lengthSize;
-    for (int i = 0; i < numberOfRows - 1; i++) {
-      buffer.position(startOffset);
-      startOffset += getLengthFromBuffer(buffer) + lengthSize;
-      int length = startOffset - (currentOffset);
+    int localOffset = 0;
+    for (int i = 0; i < numberOfRows; i++) {
+      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+      localOffset += 2;
       if (length == 0) {
         vector.putNull(i);
       } else {
-        vector.putBoolean(i, ByteUtil.toBoolean(data[currentOffset]));
+        vector.putBoolean(i, ByteUtil.toBoolean(data[localOffset]));
       }
-      currentOffset = startOffset + lengthSize;
-    }
-    int length = (data.length - currentOffset);
-    if (length == 0) {
-      vector.putNull(numberOfRows - 1);
-    } else {
-      vector.putBoolean(numberOfRows - 1, ByteUtil.toBoolean(data[currentOffset]));
+      localOffset += length;
     }
   }
 }
 
 class ShortVectorFiller extends AbstractNonDictionaryVectorFiller {
 
-  public ShortVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+  public ShortVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    int startOffset = 0;
-    int currentOffset = lengthSize;
-    for (int i = 0; i < numberOfRows - 1; i++) {
-      buffer.position(startOffset);
-      startOffset += getLengthFromBuffer(buffer) + lengthSize;
-      int length = startOffset - (currentOffset);
+    int localOffset = 0;
+    for (int i = 0; i < numberOfRows; i++) {
+      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+      localOffset += 2;
       if (length == 0) {
         vector.putNull(i);
       } else {
-        vector.putShort(i, ByteUtil.toXorShort(data, currentOffset, length));
+        vector.putShort(i, ByteUtil.toXorShort(data, localOffset, length));
       }
-      currentOffset = startOffset + lengthSize;
-    }
-    int length = (data.length - currentOffset);
-    if (length == 0) {
-      vector.putNull(numberOfRows - 1);
-    } else {
-      vector.putShort(numberOfRows - 1, ByteUtil.toXorShort(data, currentOffset, length));
+      localOffset += length;
     }
   }
 }
 
 class IntVectorFiller extends AbstractNonDictionaryVectorFiller {
 
-  public IntVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+  public IntVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    int startOffset = 0;
-    int currentOffset = lengthSize;
-    for (int i = 0; i < numberOfRows - 1; i++) {
-      buffer.position(startOffset);
-      startOffset += getLengthFromBuffer(buffer) + lengthSize;
-      int length = startOffset - (currentOffset);
+    int localOffset = 0;
+    for (int i = 0; i < numberOfRows; i++) {
+      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+      localOffset += 2;
       if (length == 0) {
         vector.putNull(i);
       } else {
-        vector.putInt(i, ByteUtil.toXorInt(data, currentOffset, length));
+        vector.putInt(i, ByteUtil.toXorInt(data, localOffset, length));
       }
-      currentOffset = startOffset + lengthSize;
-    }
-    int length = (data.length - currentOffset);
-    if (length == 0) {
-      vector.putNull(numberOfRows - 1);
-    } else {
-      vector.putInt(numberOfRows - 1, ByteUtil.toXorInt(data, currentOffset, length));
+      localOffset += length;
     }
   }
 }
 
 class LongVectorFiller extends AbstractNonDictionaryVectorFiller {
 
-  public LongVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+  public LongVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    int startOffset = 0;
-    int currentOffset = lengthSize;
-    for (int i = 0; i < numberOfRows - 1; i++) {
-      buffer.position(startOffset);
-      startOffset += getLengthFromBuffer(buffer) + lengthSize;
-      int length = startOffset - (currentOffset);
+    int localOffset = 0;
+    for (int i = 0; i < numberOfRows; i++) {
+      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+      localOffset += 2;
       if (length == 0) {
         vector.putNull(i);
       } else {
         vector.putLong(i, DataTypeUtil
-            .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+            .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), localOffset,
                 length));
       }
-      currentOffset = startOffset + lengthSize;
-    }
-    int length = (data.length - currentOffset);
-    if (length == 0) {
-      vector.putNull(numberOfRows - 1);
-    } else {
-      vector.putLong(numberOfRows - 1, DataTypeUtil
-          .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
-              length));
+      localOffset += length;
     }
   }
 }
 
 class TimeStampVectorFiller extends AbstractNonDictionaryVectorFiller {
 
-  public TimeStampVectorFiller(int lengthSize, int numberOfRows) {
-    super(lengthSize, numberOfRows);
+  public TimeStampVectorFiller(int numberOfRows) {
+    super(numberOfRows);
   }
 
   @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+  public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
-    int startOffset = 0;
-    int currentOffset = lengthSize;
-    for (int i = 0; i < numberOfRows - 1; i++) {
-      buffer.position(startOffset);
-      startOffset += getLengthFromBuffer(buffer) + lengthSize;
-      int length = startOffset - (currentOffset);
+    int localOffset = 0;
+    for (int i = 0; i < numberOfRows; i++) {
+      int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
+      localOffset += 2;
       if (length == 0) {
         vector.putNull(i);
       } else {
-        vector.putLong(i, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+        vector.putLong(i, ByteUtil.toXorLong(data, localOffset, length) * 1000L);
       }
-      currentOffset = startOffset + lengthSize;
-    }
-    int length = (data.length - currentOffset);
-    if (length == 0) {
-      vector.putNull(numberOfRows - 1);
-    } else {
-      vector.putLong(numberOfRows - 1, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+      localOffset += length;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index d4bae90..ef63224 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -56,7 +56,7 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
     BitSet deletedRows = vectorInfo.deletedRows;
     BitSet nullBits = new BitSet(numOfRows);
     vector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false);
+        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false, false);
     fillVector(data, vectorInfo, vector);
     if (vector instanceof ConvertableVector) {
       ((ConvertableVector) vector).convert();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index b80ad7f..2873eed 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -98,19 +98,14 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
   @Override
   public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
       ColumnVectorInfo vectorInfo) {
-    this.invertedIndexReverse = invertedIndex;
-    int lengthSize = getLengthSize();
     CarbonColumnVector vector = vectorInfo.vector;
     DataType dt = vector.getType();
-    // creating a byte buffer which will wrap the length of the row
-    ByteBuffer buffer = ByteBuffer.wrap(data);
     AbstractNonDictionaryVectorFiller vectorFiller =
-        NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
-    BitSet nullBits = new BitSet(numberOfRows);
+        NonDictionaryVectorFillerFactory.getVectorFiller(getLengthSize(), dt, numberOfRows);
     vector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, vectorInfo.deletedRows,
-            false);
-    vectorFiller.fillVector(data, vector, buffer);
+        .getDirectVectorWrapperFactory(vector, invertedIndex, new BitSet(), vectorInfo.deletedRows,
+            false, false);
+    vectorFiller.fillVector(data, vector);
     if (vector instanceof ConvertableVector) {
       ((ConvertableVector) vector).convert();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index d19d1c9..d73318d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -252,7 +252,7 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
       vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true);
+          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true, false);
       if (vectorDataType == DataTypes.FLOAT) {
         float floatFactor = factor.floatValue();
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 1671246..12d108b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -309,7 +309,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
-              true);
+              true, false);
       fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 21421d3..b300ee1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -255,7 +255,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
       vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true);
+          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true, false);
       if (vectorDataType == DataTypes.FLOAT) {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
           byte[] byteData = columnPage.getBytePage();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 1813907..d77a949 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -282,7 +282,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
-              true);
+              true, false);
       fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
@@ -369,7 +369,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+          int[] shortIntData = ByteUtil.toIntArrayFrom3Bytes(shortIntPage, pageSize);
           decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
         } else {
           for (int i = 0; i < pageSize; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index b5c855e..1825850 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -212,7 +212,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
-              true);
+              true, false);
       fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
@@ -298,7 +298,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+          int[] shortIntData = ByteUtil.toIntArrayFrom3Bytes(shortIntPage, pageSize);
           decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
         } else {
           for (int i = 0; i < pageSize; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
index 4884b4d..84e42f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
@@ -39,7 +39,8 @@ public final class ColumnarVectorWrapperDirectFactory {
    * @return wrapped CarbonColumnVector
    */
   public static CarbonColumnVector getDirectVectorWrapperFactory(CarbonColumnVector columnVector,
-      int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists) {
+      int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists,
+      boolean isDictVector) {
     if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null || deletedRows
         .isEmpty())) {
       return new ColumnarVectorWrapperDirectWithInvertedIndex(columnVector, invertedIndex,
@@ -50,7 +51,7 @@ public final class ColumnarVectorWrapperDirectFactory {
     } else if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows != null
         && !deletedRows.isEmpty())) {
       return new ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(columnVector,
-          deletedRows, invertedIndex, nullBitset, isnullBitsExists);
+          deletedRows, invertedIndex, nullBitset, isnullBitsExists, isDictVector);
     } else {
       return columnVector;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
index e4507cb..46b2041 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
@@ -55,9 +55,9 @@ class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
    */
   public ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(
       CarbonColumnVector vectorWrapper, BitSet deletedRows, int[] invertedIndex, BitSet nullBits,
-      boolean isnullBitsExists) {
-    super(new CarbonColumnVectorImpl(invertedIndex.length, vectorWrapper.getType()), invertedIndex,
-        isnullBitsExists);
+      boolean isnullBitsExists, boolean isDictVector) {
+    super(new CarbonColumnVectorImpl(invertedIndex.length,
+        isDictVector ? DataTypes.INT : vectorWrapper.getType()), invertedIndex, isnullBitsExists);
     this.deletedRows = deletedRows;
     this.carbonColumnVector = vectorWrapper;
     this.nullBits = nullBits;
@@ -82,7 +82,7 @@ class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
   public void convert() {
     if (columnVector instanceof CarbonColumnVectorImpl) {
       CarbonColumnVectorImpl localVector = (CarbonColumnVectorImpl) columnVector;
-      DataType dataType = carbonColumnVector.getType();
+      DataType dataType = columnVector.getType();
       int length = invertedIndex.length;
       int counter = 0;
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 6188948..8e6e15b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -734,7 +734,7 @@ public final class ByteUtil {
     return Float.intBitsToFloat(toXorInt(value, offset, length));
   }
 
-  public static int[] toIntArray(byte[] data, int size) {
+  public static int[] toIntArrayFrom3Bytes(byte[] data, int size) {
     int[] ints = new int[size];
     for (int i = 0; i < ints.length; i++) {
       ints[i] = ByteUtil.valueOf3Bytes(data, i * 3);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
index a989230..24524b8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala
@@ -224,7 +224,7 @@ class CastColumnTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("Dictionary INT In to implicit Int") {
     checkAnswer(
-      sql("select empno,empname,workgroupcategory from DICTIONARY_CARBON_1 where workgroupcategory in (1, 2)"),
+      sql("select empno,empname,workgroupcategory from DICTIONARY_CARBON_1 where workgroupcategory in ('1', '2')"),
       sql("select empno,empname,workgroupcategory from DICTIONARY_HIVE_1 where workgroupcategory in ('1', '2')")
     )
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 33031fc..542a454 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -90,7 +90,7 @@ class CarbonScanRDD[T: ClassTag](
   }
   private var vectorReader = false
 
-  private var directScan = false
+  private var directFill = false
 
   private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
 
@@ -230,7 +230,7 @@ class CarbonScanRDD[T: ClassTag](
       statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
       statisticRecorder.recordStatisticsForDriver(statistic, queryId)
       statistic = new QueryStatistic()
-      val carbonDistribution = if (directScan) {
+      val carbonDistribution = if (directFill) {
         CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
       } else {
         CarbonProperties.getInstance().getProperty(
@@ -443,7 +443,7 @@ class CarbonScanRDD[T: ClassTag](
         case _ =>
           // create record reader for CarbonData file format
           if (vectorReader) {
-            model.setDirectVectorFill(directScan)
+            model.setDirectVectorFill(directFill)
             val carbonRecordReader = createVectorizedCarbonRecordReader(model,
               inputMetricsStats,
               "true")
@@ -757,6 +757,6 @@ class CarbonScanRDD[T: ClassTag](
 
   // TODO find the better way set it.
   def setDirectScanSupport(isDirectScan: Boolean): Unit = {
-    directScan = isDirectScan
+    directFill = isDirectScan
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 3c2b753..22188c4 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -173,7 +173,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putByteArray(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
-      sparkColumnVectorProxy.putByteArray(counter++, value);
+      sparkColumnVectorProxy.putByteArray(counter++, value, 0, value.length);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
index c55387a..7c5902e 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
@@ -115,7 +115,7 @@ class ColumnarVectorWrapperDirect implements CarbonColumnVector {
   }
 
   @Override public void putByteArray(int rowId, byte[] value) {
-    sparkColumnVectorProxy.putByteArray(rowId, value);
+    sparkColumnVectorProxy.putByteArray(rowId, value, 0, value.length);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/58ba45ef/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index 6ec9a26..c16d381 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql;
 
+import java.lang.reflect.Field;
 import java.math.BigInteger;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
@@ -24,8 +25,8 @@ import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
 import org.apache.parquet.column.Encoding;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 import org.apache.spark.sql.types.CalendarIntervalType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -38,290 +39,441 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * Adapter class which handles the columnar vector reading of the carbondata
  * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.1 version related api changes since
+ * handles the complexity of spark 2.3 version related api changes since
  * spark ColumnVector and ColumnarBatch interfaces are still evolving.
  */
 public class CarbonVectorProxy {
 
-    private ColumnarBatch columnarBatch;
-
-    private ColumnVectorProxy[] columnVectorProxies;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
-        columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
-        for (int i = 0; i < columnVectorProxies.length; i++) {
-            columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
-        }
+  private ColumnarBatch columnarBatch;
+  private ColumnVectorProxy[] columnVectorProxies;
+
+  /**
+   * Adapter class which handles the columnar vector reading of the carbondata
+   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+   * handles the complexity of spark 2.3 version related api changes since
+   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+   *
+   * @param memMode       which represent the type onheap or offheap vector.
+   * @param rowNum        rows number for vector reading
+   * @param structFileds, metadata related to current schema of table.
+   */
+  public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
+    columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
+    columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+    for (int i = 0; i < columnVectorProxies.length; i++) {
+      columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
     }
+    updateColumnVectors();
+
+  }
 
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-        columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
-        for (int i = 0; i < columnVectorProxies.length; i++) {
-            columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+  private void updateColumnVectors() {
+    try {
+      Field field = columnarBatch.getClass().getDeclaredField("columns");
+      field.setAccessible(true);
+      field.set(columnarBatch, columnVectorProxies);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+    columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+    columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+    for (int i = 0; i < columnVectorProxies.length; i++) {
+      columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
+    }
+    updateColumnVectors();
+  }
+
+  public ColumnVectorProxy getColumnVector(int ordinal) {
+    return columnVectorProxies[ordinal];
+  }
+
+  /**
+   * Returns the number of rows for read, including filtered rows.
+   */
+  public int numRows() {
+    return columnarBatch.capacity();
+  }
+
+  /**
+   * This API will return a columnvector from a batch of column vector rows
+   * based on the ordinal
+   *
+   * @param ordinal
+   * @return
+   */
+  public ColumnVector column(int ordinal) {
+    return columnarBatch.column(ordinal);
+  }
+
+  /**
+   * Resets this column for writing. The currently stored values are no longer accessible.
+   */
+  public void reset() {
+    for (int i = 0; i < columnarBatch.numCols(); i++) {
+      ((ColumnVectorProxy) columnarBatch.column(i)).reset();
+    }
+  }
+
+  public void resetDictionaryIds(int ordinal) {
+    (((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
+  }
+
+  /**
+   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+   */
+  public InternalRow getRow(int rowId) {
+    return columnarBatch.getRow(rowId);
+  }
+
+  /**
+   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+   */
+  public Object getColumnarBatch() {
+    return columnarBatch;
+  }
+
+  /**
+   * Called to close all the columns in this batch. It is not valid to access the data after
+   * calling this. This must be called at the end to clean up memory allocations.
+   */
+  public void close() {
+    columnarBatch.close();
+  }
+
+  /**
+   * Sets the number of rows in this batch.
+   */
+  public void setNumRows(int numRows) {
+    columnarBatch.setNumRows(numRows);
+  }
+
+  public DataType dataType(int ordinal) {
+    return columnarBatch.column(ordinal).dataType();
+  }
+
+  public static class ColumnVectorProxy extends ColumnVector {
+
+    private ColumnVector vector;
+
+    private LazyPageLoader pageLoad;
+
+    private boolean isLoaded;
+
+    public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
+      super(capacity, columnVector.dataType(), mode);
+      try {
+        Field childColumns =
+            columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
+        childColumns.setAccessible(true);
+        Object o = childColumns.get(columnVector);
+        childColumns.set(this, o);
+        Field childColumns1 =
+            columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
+        childColumns1.setAccessible(true);
+        Object o1 = childColumns1.get(columnVector);
+        childColumns1.set(this, o1);
+
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      vector = columnVector;
+    }
+
+    public void putRowToColumnBatch(int rowId, Object value) {
+      org.apache.spark.sql.types.DataType t = dataType();
+      if (null == value) {
+        putNull(rowId);
+      } else {
+        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+          putBoolean(rowId, (boolean) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+          putByte(rowId, (byte) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+          putShort(rowId, (short) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+          putInt(rowId, (int) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+          putLong(rowId, (long) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+          putFloat(rowId, (float) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+          putDouble(rowId, (double) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+          UTF8String v = (UTF8String) value;
+          putByteArray(rowId, v.getBytes());
+        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+          DecimalType dt = (DecimalType) t;
+          Decimal d = Decimal.fromDecimal(value);
+          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+            putInt(rowId, (int) d.toUnscaledLong());
+          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+            putLong(rowId, d.toUnscaledLong());
+          } else {
+            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+            byte[] bytes = integer.toByteArray();
+            putByteArray(rowId, bytes, 0, bytes.length);
+          }
+        } else if (t instanceof CalendarIntervalType) {
+          CalendarInterval c = (CalendarInterval) value;
+          vector.getChildColumn(0).putInt(rowId, c.months);
+          vector.getChildColumn(1).putLong(rowId, c.microseconds);
+        } else if (t instanceof org.apache.spark.sql.types.DateType) {
+          putInt(rowId, (int) value);
+        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+          putLong(rowId, (long) value);
         }
+      }
     }
 
-    public ColumnVectorProxy getColumnVector(int ordinal) {
-        return columnVectorProxies[ordinal];
+    public void putBoolean(int rowId, boolean value) {
+      vector.putBoolean(rowId, value);
     }
 
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
+    public void putByte(int rowId, byte value) {
+      vector.putByte(rowId, value);
     }
 
+    public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putBytes(rowId, count, src, srcIndex);
+    }
 
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.capacity();
+    public void putShort(int rowId, short value) {
+      vector.putShort(rowId, value);
     }
 
+    public void putInt(int rowId, int value) {
+      vector.putInt(rowId, value);
+    }
 
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
+    public void putFloat(int rowId, float value) {
+      vector.putFloat(rowId, value);
     }
 
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
+    public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+      vector.putFloats(rowId, count, src, srcIndex);
     }
 
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
+    public void putLong(int rowId, long value) {
+      vector.putLong(rowId, value);
     }
 
-    public void resetDictionaryIds(int ordinal) {
-        columnarBatch.column(ordinal).getDictionaryIds().reset();
+    public void putDouble(int rowId, double value) {
+      vector.putDouble(rowId, value);
     }
 
-    /**
-     * This API will return a columnvector from a batch of column vector rows
-     * based on the ordinal
-     *
-     * @param ordinal
-     * @return
-     */
-    public ColumnVector column(int ordinal) {
-        return columnarBatch.column(ordinal);
+    public void putInts(int rowId, int count, int value) {
+      vector.putInts(rowId, count, value);
     }
 
+    public void putInts(int rowId, int count, int[] src, int srcIndex) {
+      vector.putInts(rowId, count, src, srcIndex);
+    }
 
+    public void putShorts(int rowId, int count, short value) {
+      vector.putShorts(rowId, count, value);
+    }
 
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        columnarBatch.reset();
+    public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+      vector.putShorts(rowId, count, src, srcIndex);
     }
 
+    public void putLongs(int rowId, int count, long value) {
+      vector.putLongs(rowId, count, value);
+    }
 
-    public static class ColumnVectorProxy {
+    public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+      vector.putLongs(rowId, count, src, srcIndex);
+    }
 
-        private ColumnVector vector;
+    public void putDoubles(int rowId, int count, double value) {
+      vector.putDoubles(rowId, count, value);
+    }
 
-        public ColumnVectorProxy(ColumnarBatch columnarBatch, int ordinal) {
-            this.vector = columnarBatch.column(ordinal);
-        }
+    public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+      vector.putDoubles(rowId, count, src, srcIndex);
+    }
 
-        public void putRowToColumnBatch(int rowId, Object value) {
-            org.apache.spark.sql.types.DataType t = dataType();
-            if (null == value) {
-                putNull(rowId);
-            } else {
-                if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                    putBoolean(rowId, (boolean) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                    putByte(rowId, (byte) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                    putShort(rowId, (short) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                    putInt(rowId, (int) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                    putLong(rowId, (long) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                    putFloat(rowId, (float) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                    putDouble(rowId, (double) value);
-                } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                    UTF8String v = (UTF8String) value;
-                    putByteArray(rowId, v.getBytes());
-                } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-                    DecimalType dt = (DecimalType) t;
-                    Decimal d = Decimal.fromDecimal(value);
-                    if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                        putInt(rowId, (int) d.toUnscaledLong());
-                    } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                        putLong(rowId, d.toUnscaledLong());
-                    } else {
-                        final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                        byte[] bytes = integer.toByteArray();
-                        putByteArray(rowId, bytes, 0, bytes.length);
-                    }
-                } else if (t instanceof CalendarIntervalType) {
-                    CalendarInterval c = (CalendarInterval) value;
-                    vector.getChildColumn(0).putInt(rowId, c.months);
-                    vector.getChildColumn(1).putLong(rowId, c.microseconds);
-                } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                    putInt(rowId, (int) value);
-                } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                    putLong(rowId, (long) value);
-                }
-            }
-        }
+    public DataType dataType(int ordinal) {
+      return vector.dataType();
+    }
 
-        public void putBoolean(int rowId, boolean value) {
-            vector.putBoolean(rowId, value);
-        }
+    public void putNotNull(int rowId) {
+      vector.putNotNull(rowId);
+    }
 
-        public void putByte(int rowId, byte value) {
-            vector.putByte(rowId, value);
-        }
+    public void putNotNulls(int rowId, int count) {
+      vector.putNotNulls(rowId, count);
+    }
 
-        public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
-            vector.putBytes(rowId, count, src, srcIndex);
-        }
+    public void putDictionaryInt(int rowId, int value) {
+      vector.getDictionaryIds().putInt(rowId, value);
+    }
 
-        public void putShort(int rowId, short value) {
-            vector.putShort(rowId, value);
-        }
+    public void setDictionary(CarbonDictionary dictionary) {
+      if (null != dictionary) {
+        CarbonDictionaryWrapper dictionaryWrapper =
+            new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
+        vector.setDictionary(dictionaryWrapper);
+        this.dictionary = dictionaryWrapper;
+      } else {
+        vector.setDictionary(null);
+      }
+    }
 
-        public void putInt(int rowId, int value) {
-            vector.putInt(rowId, value);
-        }
+    public void putNull(int rowId) {
+      vector.putNull(rowId);
+    }
 
-        public void putFloat(int rowId, float value) {
-            vector.putFloat(rowId, value);
-        }
+    public void putNulls(int rowId, int count) {
+      vector.putNulls(rowId, count);
+    }
 
-        public void putFloats(int rowId, int count, float[] src, int srcIndex)  {
-            vector.putFloats(rowId, count, src, srcIndex);
-        }
+    public boolean hasDictionary() {
+      return vector.hasDictionary();
+    }
 
-        public void putLong(int rowId, long value) {
-            vector.putLong(rowId, value);
-        }
+    public ColumnVector reserveDictionaryIds(int capacity) {
+      this.dictionaryIds = vector.reserveDictionaryIds(capacity);
+      return dictionaryIds;
+    }
 
-        public void putDouble(int rowId, double value) {
-            vector.putDouble(rowId, value);
-        }
+    @Override public boolean isNullAt(int i) {
+      checkPageLoaded();
+      return vector.isNullAt(i);
+    }
 
-        public void putByteArray(int rowId, byte[] value) {
-            vector.putByteArray(rowId, value);
-        }
+    @Override public boolean getBoolean(int i) {
+      checkPageLoaded();
+      return vector.getBoolean(i);
+    }
 
-        public void putInts(int rowId, int count, int value) {
-            vector.putInts(rowId, count, value);
-        }
+    @Override public byte getByte(int i) {
+      checkPageLoaded();
+      return vector.getByte(i);
+    }
 
-        public void putInts(int rowId, int count, int[] src, int srcIndex) {
-            vector.putInts(rowId, count, src, srcIndex);
-        }
+    @Override public short getShort(int i) {
+      checkPageLoaded();
+      return vector.getShort(i);
+    }
 
-        public void putShorts(int rowId, int count, short value) {
-            vector.putShorts(rowId, count, value);
-        }
+    @Override public int getInt(int i) {
+      checkPageLoaded();
+      return vector.getInt(i);
+    }
 
-        public void putShorts(int rowId, int count, short[] src, int srcIndex) {
-            vector.putShorts(rowId, count, src, srcIndex);
-        }
+    @Override public long getLong(int i) {
+      checkPageLoaded();
+      return vector.getLong(i);
+    }
 
-        public void putLongs(int rowId, int count, long value) {
-            vector.putLongs(rowId, count, value);
-        }
+    @Override public float getFloat(int i) {
+      checkPageLoaded();
+      return vector.getFloat(i);
+    }
 
-        public void putLongs(int rowId, int count, long[] src, int srcIndex) {
-            vector.putLongs(rowId, count, src, srcIndex);
-        }
+    @Override public double getDouble(int i) {
+      checkPageLoaded();
+      return vector.getDouble(i);
+    }
 
-        public void putDecimal(int rowId, Decimal value, int precision) {
-            vector.putDecimal(rowId, value, precision);
+    @Override protected void reserveInternal(int capacity) {
+    }
 
-        }
+    @Override public void reserve(int requiredCapacity) {
+      vector.reserve(requiredCapacity);
+    }
 
-        public void putDoubles(int rowId, int count, double value) {
-            vector.putDoubles(rowId, count, value);
-        }
+    @Override public long nullsNativeAddress() {
+      return vector.nullsNativeAddress();
+    }
 
-        public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-            vector.putDoubles(rowId, count, src, srcIndex);
-        }
+    @Override public long valuesNativeAddress() {
+      return vector.valuesNativeAddress();
+    }
 
-        public void putByteArray(int rowId, byte[] value, int offset, int length) {
-          vector.putByteArray(rowId, value, offset, length);
-        }
+    @Override public void putBooleans(int rowId, int count, boolean value) {
+      vector.putBooleans(rowId, count, value);
+    }
 
-        public boolean isNullAt(int rowId) {
-            return vector.isNullAt(rowId);
-        }
+    @Override public void putBytes(int rowId, int count, byte value) {
+      vector.putBytes(rowId, count, value);
+    }
 
-        public DataType dataType() {
-            return vector.dataType();
-        }
+    @Override public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putIntsLittleEndian(rowId, count, src, srcIndex);
+    }
 
-        public void putNotNull(int rowId) {
-            vector.putNotNull(rowId);
-        }
+    @Override public int getDictId(int rowId) {
+      return vector.getDictId(rowId);
+    }
 
-        public void putNotNulls(int rowId, int count) {
-            vector.putNotNulls(rowId, count);
-        }
+    @Override public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putLongsLittleEndian(rowId, count, src, srcIndex);
+    }
 
-        public void putDictionaryInt(int rowId, int value) {
-            vector.getDictionaryIds().putInt(rowId, value);
-        }
+    @Override public void putFloats(int rowId, int count, float value) {
+      vector.putFloats(rowId, count, value);
+    }
 
-      public void setDictionary(CarbonDictionary dictionary) {
-        if (null != dictionary) {
-          vector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
-        } else {
-          vector.setDictionary(null);
-        }
-      }
+    @Override public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putFloats(rowId, count, src, srcIndex);
+    }
 
-        public void putNull(int rowId) {
-            vector.putNull(rowId);
-        }
+    @Override public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putDoubles(rowId, count, src, srcIndex);
+    }
 
-        public void putNulls(int rowId, int count) {
-            vector.putNulls(rowId, count);
-        }
+    @Override public void putArray(int rowId, int offset, int length) {
+      vector.putArray(rowId, offset, length);
+    }
 
-        public boolean hasDictionary() {
-            return vector.hasDictionary();
-        }
+    @Override public int getArrayLength(int rowId) {
+      checkPageLoaded();
+      return vector.getArrayLength(rowId);
+    }
 
-        public Object reserveDictionaryIds(int capacity ) {
-            return vector.reserveDictionaryIds(capacity);
-        }
+    @Override public int getArrayOffset(int rowId) {
+      checkPageLoaded();
+      return vector.getArrayOffset(rowId);
+    }
 
-        public void setLazyPage(LazyPageLoader lazyPage) {
-            lazyPage.loadPage();
+    @Override public void loadBytes(Array array) {
+      vector.loadBytes(array);
+    }
+
+    @Override public int putByteArray(int rowId, byte[] value, int offset, int count) {
+      return vector.putByteArray(rowId, value, offset, count);
+    }
+
+    @Override public void close() {
+      vector.close();
+    }
+
+    private void checkPageLoaded() {
+      if (!isLoaded) {
+        if (pageLoad != null) {
+          pageLoad.loadPage();
         }
+        isLoaded = true;
+      }
+    }
+
+    public void reset() {
+      isLoaded = false;
+      vector.reset();
+    }
+
+    public void setLazyPage(LazyPageLoader lazyPage) {
+      this.pageLoad = lazyPage;
+    }
 
+    public ColumnVector getVector() {
+      return vector;
     }
+  }
 }